Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B3B4D18B5C for ; Tue, 26 May 2015 19:34:43 +0000 (UTC) Received: (qmail 32810 invoked by uid 500); 26 May 2015 19:34:32 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 32597 invoked by uid 500); 26 May 2015 19:34:32 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 31770 invoked by uid 99); 26 May 2015 19:34:31 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 May 2015 19:34:31 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BC351E0A1F; Tue, 26 May 2015 19:34:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhz@apache.org To: common-commits@hadoop.apache.org Date: Tue, 26 May 2015 19:34:54 -0000 Message-Id: <9a8f2676a3c845ca9facd6cef94ab753@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [24/50] [abbrv] hadoop git commit: HDFS-8352. Erasure Coding: test webhdfs read write stripe file. (waltersu4549) HDFS-8352. Erasure Coding: test webhdfs read write stripe file. (waltersu4549) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a9197269 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a9197269 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a9197269 Branch: refs/heads/HDFS-7285 Commit: a9197269143f9d976a1565bbf4e383fac6e1326f Parents: c9103e9 Author: waltersu4549 Authored: Mon May 18 19:10:37 2015 +0800 Committer: Zhe Zhang Committed: Tue May 26 12:02:30 2015 -0700 ---------------------------------------------------------------------- .../hadoop/hdfs/TestWriteReadStripedFile.java | 267 ++++++++++--------- 1 file changed, 148 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9197269/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java index 57d6eb9..f78fb7a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java @@ -21,9 +21,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.web.ByteRangeInputStream; +import org.apache.hadoop.hdfs.web.WebHdfsConstants; +import org.apache.hadoop.hdfs.web.WebHdfsTestUtil; import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder; import org.junit.AfterClass; import org.junit.Assert; @@ -33,23 +37,26 @@ import org.junit.Test; import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Random; public class TestWriteReadStripedFile { private static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS; private static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS; - - private static DistributedFileSystem fs; private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; private final static int stripesPerBlock = 4; static int blockSize = cellSize * stripesPerBlock; static int numDNs = dataBlocks + parityBlocks + 2; private static MiniDFSCluster cluster; + private static Configuration conf; + private static FileSystem fs; + + private static Random r= new Random(); @BeforeClass public static void setup() throws IOException { - Configuration conf = new Configuration(); + conf = new Configuration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); cluster.getFileSystem().getClient().createErasureCodingZone("/", null); @@ -134,7 +141,7 @@ public class TestWriteReadStripedFile { @Test public void testFileMoreThanABlockGroup2() throws IOException { testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup2", - blockSize * dataBlocks + cellSize+ 123); + blockSize * dataBlocks + cellSize + 123); } @@ -171,7 +178,7 @@ public class TestWriteReadStripedFile { } private void assertSeekAndRead(FSDataInputStream fsdis, int pos, - int writeBytes) throws IOException { + int writeBytes) throws IOException { fsdis.seek(pos); byte[] buf = new byte[writeBytes]; int readLen = readAll(fsdis, buf); @@ -182,147 +189,169 @@ public class TestWriteReadStripedFile { } } - private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes) + private void testOneFileUsingDFSStripedInputStream(String src, int fileLength) throws IOException { - Path testPath = new Path(src); - final byte[] bytes = generateBytes(writeBytes); - DFSTestUtil.writeFile(fs, testPath, new String(bytes)); - //check file length - FileStatus status = fs.getFileStatus(testPath); - long fileLength = status.getLen(); + final byte[] expected = generateBytes(fileLength); + Path srcPath = new Path(src); + DFSTestUtil.writeFile(fs, srcPath, new String(expected)); + + verifyLength(fs, srcPath, fileLength); + + byte[] smallBuf = new byte[1024]; + byte[] largeBuf = new byte[fileLength + 100]; + verifyPread(fs, srcPath, fileLength, expected, largeBuf); + + verifyStatefulRead(fs, srcPath, fileLength, expected, largeBuf); + verifySeek(fs, srcPath, fileLength); + verifyStatefulRead(fs, srcPath, fileLength, expected, + ByteBuffer.allocate(fileLength + 100)); + verifyStatefulRead(fs, srcPath, fileLength, expected, smallBuf); + verifyStatefulRead(fs, srcPath, fileLength, expected, + ByteBuffer.allocate(1024)); + } + + @Test + public void testWriteReadUsingWebHdfs() throws Exception { + int fileLength = blockSize * dataBlocks + cellSize + 123; + + final byte[] expected = generateBytes(fileLength); + FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, + WebHdfsConstants.WEBHDFS_SCHEME); + Path srcPath = new Path("/testWriteReadUsingWebHdfs_stripe"); + DFSTestUtil.writeFile(fs, srcPath, new String(expected)); + + verifyLength(fs, srcPath, fileLength); + + byte[] smallBuf = new byte[1024]; + byte[] largeBuf = new byte[fileLength + 100]; + verifyPread(fs, srcPath, fileLength, expected, largeBuf); + + verifyStatefulRead(fs, srcPath, fileLength, expected, largeBuf); + verifySeek(fs, srcPath, fileLength); + verifyStatefulRead(fs, srcPath, fileLength, expected, smallBuf); + //webhdfs doesn't support bytebuffer read + + } + + void verifyLength(FileSystem fs, Path srcPath, int fileLength) + throws IOException { + FileStatus status = fs.getFileStatus(srcPath); Assert.assertEquals("File length should be the same", - writeBytes, fileLength); + fileLength, status.getLen()); + } - // pread - try (FSDataInputStream fsdis = fs.open(new Path(src))) { - byte[] buf = new byte[writeBytes + 100]; - int readLen = fsdis.read(0, buf, 0, buf.length); - readLen = readLen >= 0 ? readLen : 0; - Assert.assertEquals("The length of file should be the same to write size", - writeBytes, readLen); - for (int i = 0; i < writeBytes; i++) { - Assert.assertEquals("Byte at " + i + " should be the same", getByte(i), - buf[i]); + void verifyPread(FileSystem fs, Path srcPath, int fileLength, + byte[] expected, byte[] buf) throws IOException { + FSDataInputStream in = fs.open(srcPath); + int[] startOffsets = {0, 1, cellSize - 102, cellSize, cellSize + 102, + cellSize * (dataBlocks - 1), cellSize * (dataBlocks - 1) + 102, + cellSize * dataBlocks, fileLength - 102, fileLength - 1}; + for (int startOffset : startOffsets) { + startOffset = Math.max(0, Math.min(startOffset, fileLength - 1)); + int remaining = fileLength - startOffset; + in.readFully(startOffset, buf, 0, remaining); + for (int i = 0; i < remaining; i++) { + Assert.assertEquals("Byte at " + (startOffset + i) + " should be the " + + "same", + expected[startOffset + i], buf[i]); } } + in.close(); + } - // stateful read with byte array - try (FSDataInputStream fsdis = fs.open(new Path(src))) { - byte[] buf = new byte[writeBytes + 100]; - int readLen = readAll(fsdis, buf); - Assert.assertEquals("The length of file should be the same to write size", - writeBytes, readLen); - for (int i = 0; i < writeBytes; i++) { - Assert.assertEquals("Byte at " + i + " should be the same", getByte(i), - buf[i]); + void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength, + byte[] expected, byte[] buf) throws IOException { + FSDataInputStream in = fs.open(srcPath); + final byte[] result = new byte[fileLength]; + int readLen = 0; + int ret; + do { + ret = in.read(buf, 0, buf.length); + if (ret > 0) { + System.arraycopy(buf, 0, result, readLen, ret); + readLen += ret; } - } + } while (ret >= 0); + Assert.assertEquals("The length of file should be the same to write size", + fileLength, readLen); + Assert.assertArrayEquals(expected, result); + in.close(); + } - // seek and stateful read - try (FSDataInputStream fsdis = fs.open(new Path(src))) { - // seek to 1/2 of content - int pos = writeBytes/2; - assertSeekAndRead(fsdis, pos, writeBytes); - // seek to 1/3 of content - pos = writeBytes/3; - assertSeekAndRead(fsdis, pos, writeBytes); + void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength, + byte[] expected, ByteBuffer buf) throws IOException { + FSDataInputStream in = fs.open(srcPath); + ByteBuffer result = ByteBuffer.allocate(fileLength); + int readLen = 0; + int ret; + do { + ret = in.read(buf); + if (ret > 0) { + readLen += ret; + buf.flip(); + result.put(buf); + buf.clear(); + } + } while (ret >= 0); + readLen = readLen >= 0 ? readLen : 0; + Assert.assertEquals("The length of file should be the same to write size", + fileLength, readLen); + Assert.assertArrayEquals(expected, result.array()); + in.close(); + } - // seek to 0 pos - pos = 0; - assertSeekAndRead(fsdis, pos, writeBytes); - if (writeBytes > cellSize) { - // seek to cellSize boundary - pos = cellSize -1; - assertSeekAndRead(fsdis, pos, writeBytes); - } + void verifySeek(FileSystem fs, Path srcPath, int fileLength) + throws IOException { + FSDataInputStream in = fs.open(srcPath); + // seek to 1/2 of content + int pos = fileLength / 2; + assertSeekAndRead(in, pos, fileLength); + + // seek to 1/3 of content + pos = fileLength / 3; + assertSeekAndRead(in, pos, fileLength); + + // seek to 0 pos + pos = 0; + assertSeekAndRead(in, pos, fileLength); + + if (fileLength > cellSize) { + // seek to cellSize boundary + pos = cellSize - 1; + assertSeekAndRead(in, pos, fileLength); + } - if (writeBytes > cellSize * dataBlocks) { - // seek to striped cell group boundary - pos = cellSize * dataBlocks - 1; - assertSeekAndRead(fsdis, pos, writeBytes); - } + if (fileLength > cellSize * dataBlocks) { + // seek to striped cell group boundary + pos = cellSize * dataBlocks - 1; + assertSeekAndRead(in, pos, fileLength); + } - if (writeBytes > blockSize * dataBlocks) { - // seek to striped block group boundary - pos = blockSize * dataBlocks - 1; - assertSeekAndRead(fsdis, pos, writeBytes); - } + if (fileLength > blockSize * dataBlocks) { + // seek to striped block group boundary + pos = blockSize * dataBlocks - 1; + assertSeekAndRead(in, pos, fileLength); + } + if(!(in.getWrappedStream() instanceof ByteRangeInputStream)){ try { - fsdis.seek(-1); + in.seek(-1); Assert.fail("Should be failed if seek to negative offset"); } catch (EOFException e) { // expected } try { - fsdis.seek(writeBytes + 1); + in.seek(fileLength + 1); Assert.fail("Should be failed if seek after EOF"); } catch (EOFException e) { // expected } } - - // stateful read with ByteBuffer - try (FSDataInputStream fsdis = fs.open(new Path(src))) { - ByteBuffer buf = ByteBuffer.allocate(writeBytes + 100); - int readLen = 0; - int ret; - do { - ret = fsdis.read(buf); - if (ret > 0) { - readLen += ret; - } - } while (ret >= 0); - readLen = readLen >= 0 ? readLen : 0; - Assert.assertEquals("The length of file should be the same to write size", - writeBytes, readLen); - for (int i = 0; i < writeBytes; i++) { - Assert.assertEquals("Byte at " + i + " should be the same", getByte(i), - buf.array()[i]); - } - } - - // stateful read with 1KB size byte array - try (FSDataInputStream fsdis = fs.open(new Path(src))) { - final byte[] result = new byte[writeBytes]; - final byte[] buf = new byte[1024]; - int readLen = 0; - int ret; - do { - ret = fsdis.read(buf, 0, buf.length); - if (ret > 0) { - System.arraycopy(buf, 0, result, readLen, ret); - readLen += ret; - } - } while (ret >= 0); - Assert.assertEquals("The length of file should be the same to write size", - writeBytes, readLen); - Assert.assertArrayEquals(bytes, result); - } - - // stateful read using ByteBuffer with 1KB size - try (FSDataInputStream fsdis = fs.open(new Path(src))) { - final ByteBuffer result = ByteBuffer.allocate(writeBytes); - final ByteBuffer buf = ByteBuffer.allocate(1024); - int readLen = 0; - int ret; - do { - ret = fsdis.read(buf); - if (ret > 0) { - readLen += ret; - buf.flip(); - result.put(buf); - buf.clear(); - } - } while (ret >= 0); - Assert.assertEquals("The length of file should be the same to write size", - writeBytes, readLen); - Assert.assertArrayEquals(bytes, result.array()); - } + in.close(); } @Test