Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2EC0F18DF0 for ; Mon, 10 Aug 2015 21:04:00 +0000 (UTC) Received: (qmail 8009 invoked by uid 500); 10 Aug 2015 21:03:48 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 7806 invoked by uid 500); 10 Aug 2015 21:03:48 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 7284 invoked by uid 99); 10 Aug 2015 21:03:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Aug 2015 21:03:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5FF84E1783; Mon, 10 Aug 2015 21:03:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhz@apache.org To: common-commits@hadoop.apache.org Date: Mon, 10 Aug 2015 21:03:52 -0000 Message-Id: <7d72366946b3441fb6b96d035f5ec40c@git.apache.org> In-Reply-To: <966f1d8861a3402888bdf6c685e5182a@git.apache.org> References: <966f1d8861a3402888bdf6c685e5182a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [05/18] hadoop git commit: HDFS-7285. Erasure Coding Support inside HDFS. http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java new file mode 100644 index 0000000..baf6106 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java @@ -0,0 +1,335 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.io.erasurecode.CodecUtil; +import org.apache.hadoop.io.erasurecode.ECSchema; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; + +public class TestDFSStripedInputStream { + + public static final Log LOG = LogFactory.getLog(TestDFSStripedInputStream.class); + + private MiniDFSCluster cluster; + private Configuration conf = new Configuration(); + private DistributedFileSystem fs; + private final Path dirPath = new Path("/striped"); + private Path filePath = new Path(dirPath, "file"); + private final ECSchema schema = ErasureCodingSchemaManager.getSystemDefaultSchema(); + private final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS; + private final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS; + private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + private final int NUM_STRIPE_PER_BLOCK = 2; + private final int INTERNAL_BLOCK_SIZE = NUM_STRIPE_PER_BLOCK * CELLSIZE; + private final int BLOCK_GROUP_SIZE = DATA_BLK_NUM * INTERNAL_BLOCK_SIZE; + + @Before + public void setup() throws IOException { + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, INTERNAL_BLOCK_SIZE); + SimulatedFSDataset.setFactory(conf); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes( + DATA_BLK_NUM + PARITY_BLK_NUM).build(); + cluster.waitActive(); + for (DataNode dn : cluster.getDataNodes()) { + DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true); + } + fs = cluster.getFileSystem(); + fs.mkdirs(dirPath); + fs.getClient().createErasureCodingZone(dirPath.toString(), null, CELLSIZE); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * Test {@link DFSStripedInputStream#getBlockAt(long)} + */ + @Test + public void testRefreshBlock() throws Exception { + final int numBlocks = 4; + DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, + NUM_STRIPE_PER_BLOCK, false); + LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations( + filePath.toString(), 0, BLOCK_GROUP_SIZE * numBlocks); + final DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(), + filePath.toString(), false, schema, CELLSIZE, null); + + List lbList = lbs.getLocatedBlocks(); + for (LocatedBlock aLbList : lbList) { + LocatedStripedBlock lsb = (LocatedStripedBlock) aLbList; + LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(lsb, + CELLSIZE, DATA_BLK_NUM, PARITY_BLK_NUM); + for (int j = 0; j < DATA_BLK_NUM; j++) { + LocatedBlock refreshed = in.refreshLocatedBlock(blks[j]); + assertEquals(blks[j].getBlock(), refreshed.getBlock()); + assertEquals(blks[j].getStartOffset(), refreshed.getStartOffset()); + assertArrayEquals(blks[j].getLocations(), refreshed.getLocations()); + } + } + } + + @Test + public void testPread() throws Exception { + final int numBlocks = 2; + DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, + NUM_STRIPE_PER_BLOCK, false); + LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations( + filePath.toString(), 0, BLOCK_GROUP_SIZE * numBlocks); + int fileLen = BLOCK_GROUP_SIZE * numBlocks; + + byte[] expected = new byte[fileLen]; + assertEquals(numBlocks, lbs.getLocatedBlocks().size()); + for (int bgIdx = 0; bgIdx < numBlocks; bgIdx++) { + LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(bgIdx)); + for (int i = 0; i < DATA_BLK_NUM; i++) { + Block blk = new Block(bg.getBlock().getBlockId() + i, + NUM_STRIPE_PER_BLOCK * CELLSIZE, + bg.getBlock().getGenerationStamp()); + blk.setGenerationStamp(bg.getBlock().getGenerationStamp()); + cluster.injectBlocks(i, Arrays.asList(blk), + bg.getBlock().getBlockPoolId()); + } + + /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */ + for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) { + for (int j = 0; j < DATA_BLK_NUM; j++) { + for (int k = 0; k < CELLSIZE; k++) { + int posInBlk = i * CELLSIZE + k; + int posInFile = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE + k; + expected[bgIdx*BLOCK_GROUP_SIZE + posInFile] = + SimulatedFSDataset.simulatedByte( + new Block(bg.getBlock().getBlockId() + j), posInBlk); + } + } + } + } + DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(), + filePath.toString(), false, schema, CELLSIZE, null); + + int[] startOffsets = {0, 1, CELLSIZE - 102, CELLSIZE, CELLSIZE + 102, + CELLSIZE*DATA_BLK_NUM, CELLSIZE*DATA_BLK_NUM + 102, + BLOCK_GROUP_SIZE - 102, BLOCK_GROUP_SIZE, BLOCK_GROUP_SIZE + 102, + fileLen - 1}; + for (int startOffset : startOffsets) { + startOffset = Math.max(0, Math.min(startOffset, fileLen - 1)); + int remaining = fileLen - startOffset; + byte[] buf = new byte[fileLen]; + int ret = in.read(startOffset, buf, 0, fileLen); + assertEquals(remaining, ret); + for (int i = 0; i < remaining; i++) { + Assert.assertEquals("Byte at " + (startOffset + i) + " should be the " + + "same", + expected[startOffset + i], buf[i]); + } + } + in.close(); + } + + @Test + public void testPreadWithDNFailure() throws Exception { + final int numBlocks = 4; + final int failedDNIdx = 2; + DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, + NUM_STRIPE_PER_BLOCK, false); + LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations( + filePath.toString(), 0, BLOCK_GROUP_SIZE); + + assert lbs.get(0) instanceof LocatedStripedBlock; + LocatedStripedBlock bg = (LocatedStripedBlock)(lbs.get(0)); + for (int i = 0; i < DATA_BLK_NUM + PARITY_BLK_NUM; i++) { + Block blk = new Block(bg.getBlock().getBlockId() + i, + NUM_STRIPE_PER_BLOCK * CELLSIZE, + bg.getBlock().getGenerationStamp()); + blk.setGenerationStamp(bg.getBlock().getGenerationStamp()); + cluster.injectBlocks(i, Arrays.asList(blk), + bg.getBlock().getBlockPoolId()); + } + DFSStripedInputStream in = + new DFSStripedInputStream(fs.getClient(), filePath.toString(), false, + ErasureCodingSchemaManager.getSystemDefaultSchema(), CELLSIZE, null); + int readSize = BLOCK_GROUP_SIZE; + byte[] readBuffer = new byte[readSize]; + byte[] expected = new byte[readSize]; + cluster.stopDataNode(failedDNIdx); + /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */ + for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) { + for (int j = 0; j < DATA_BLK_NUM; j++) { + for (int k = 0; k < CELLSIZE; k++) { + int posInBlk = i * CELLSIZE + k; + int posInFile = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE + k; + expected[posInFile] = SimulatedFSDataset.simulatedByte( + new Block(bg.getBlock().getBlockId() + j), posInBlk); + } + } + } + + // Update the expected content for decoded data + for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) { + byte[][] decodeInputs = new byte[DATA_BLK_NUM + PARITY_BLK_NUM][CELLSIZE]; + int[] missingBlkIdx = new int[]{failedDNIdx + PARITY_BLK_NUM, 1, 2}; + byte[][] decodeOutputs = new byte[PARITY_BLK_NUM][CELLSIZE]; + for (int j = 0; j < DATA_BLK_NUM; j++) { + int posInBuf = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE; + if (j != failedDNIdx) { + System.arraycopy(expected, posInBuf, decodeInputs[j + PARITY_BLK_NUM], + 0, CELLSIZE); + } + } + for (int k = 0; k < CELLSIZE; k++) { + int posInBlk = i * CELLSIZE + k; + decodeInputs[0][k] = SimulatedFSDataset.simulatedByte( + new Block(bg.getBlock().getBlockId() + DATA_BLK_NUM), posInBlk); + } + for (int m : missingBlkIdx) { + decodeInputs[m] = null; + } + RawErasureDecoder rawDecoder = CodecUtil.createRSRawDecoder(conf, + DATA_BLK_NUM, PARITY_BLK_NUM); + rawDecoder.decode(decodeInputs, missingBlkIdx, decodeOutputs); + int posInBuf = i * CELLSIZE * DATA_BLK_NUM + failedDNIdx * CELLSIZE; + System.arraycopy(decodeOutputs[0], 0, expected, posInBuf, CELLSIZE); + } + int delta = 10; + int done = 0; + // read a small delta, shouldn't trigger decode + // |cell_0 | + // |10 | + done += in.read(0, readBuffer, 0, delta); + assertEquals(delta, done); + // both head and trail cells are partial + // |c_0 |c_1 |c_2 |c_3 |c_4 |c_5 | + // |256K - 10|missing|256K|256K|256K - 10|not in range| + done += in.read(delta, readBuffer, delta, + CELLSIZE * (DATA_BLK_NUM - 1) - 2 * delta); + assertEquals(CELLSIZE * (DATA_BLK_NUM - 1) - delta, done); + // read the rest + done += in.read(done, readBuffer, done, readSize - done); + assertEquals(readSize, done); + assertArrayEquals(expected, readBuffer); + } + + @Test + public void testStatefulRead() throws Exception { + testStatefulRead(false, false); + testStatefulRead(true, false); + testStatefulRead(true, true); + } + + private void testStatefulRead(boolean useByteBuffer, boolean cellMisalignPacket) + throws Exception { + final int numBlocks = 2; + final int fileSize = numBlocks * BLOCK_GROUP_SIZE; + if (cellMisalignPacket) { + conf.setInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT + 1); + tearDown(); + setup(); + } + DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, + NUM_STRIPE_PER_BLOCK, false); + LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(filePath.toString(), 0, fileSize); + + assert lbs.getLocatedBlocks().size() == numBlocks; + for (LocatedBlock lb : lbs.getLocatedBlocks()) { + assert lb instanceof LocatedStripedBlock; + LocatedStripedBlock bg = (LocatedStripedBlock)(lb); + for (int i = 0; i < DATA_BLK_NUM; i++) { + Block blk = new Block(bg.getBlock().getBlockId() + i, + NUM_STRIPE_PER_BLOCK * CELLSIZE, + bg.getBlock().getGenerationStamp()); + blk.setGenerationStamp(bg.getBlock().getGenerationStamp()); + cluster.injectBlocks(i, Arrays.asList(blk), + bg.getBlock().getBlockPoolId()); + } + } + + DFSStripedInputStream in = + new DFSStripedInputStream(fs.getClient(), filePath.toString(), + false, schema, CELLSIZE, null); + + byte[] expected = new byte[fileSize]; + + for (LocatedBlock bg : lbs.getLocatedBlocks()) { + /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */ + for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) { + for (int j = 0; j < DATA_BLK_NUM; j++) { + for (int k = 0; k < CELLSIZE; k++) { + int posInBlk = i * CELLSIZE + k; + int posInFile = (int) bg.getStartOffset() + + i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE + k; + expected[posInFile] = SimulatedFSDataset.simulatedByte( + new Block(bg.getBlock().getBlockId() + j), posInBlk); + } + } + } + } + + if (useByteBuffer) { + ByteBuffer readBuffer = ByteBuffer.allocate(fileSize); + int done = 0; + while (done < fileSize) { + int ret = in.read(readBuffer); + assertTrue(ret > 0); + done += ret; + } + assertArrayEquals(expected, readBuffer.array()); + } else { + byte[] readBuffer = new byte[fileSize]; + int done = 0; + while (done < fileSize) { + int ret = in.read(readBuffer, done, fileSize - done); + assertTrue(ret > 0); + done += ret; + } + assertArrayEquals(expected, readBuffer); + } + fs.delete(filePath, true); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java new file mode 100644 index 0000000..3f40dee --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java @@ -0,0 +1,293 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.io.erasurecode.CodecUtil; +import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder; +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.log4j.Level; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestDFSStripedOutputStream { + public static final Log LOG = LogFactory.getLog( + TestDFSStripedOutputStream.class); + + static { + GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.ALL); + GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.ALL); + } + + private int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS; + private int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS; + + private MiniDFSCluster cluster; + private DistributedFileSystem fs; + private Configuration conf; + private final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + private final int stripesPerBlock = 4; + private final int blockSize = cellSize * stripesPerBlock; + + @Before + public void setup() throws IOException { + int numDNs = dataBlocks + parityBlocks + 2; + conf = new Configuration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); + cluster.getFileSystem().getClient().createErasureCodingZone("/", null, 0); + fs = cluster.getFileSystem(); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testFileEmpty() throws IOException { + testOneFile("/EmptyFile", 0); + } + + @Test + public void testFileSmallerThanOneCell1() throws IOException { + testOneFile("/SmallerThanOneCell", 1); + } + + @Test + public void testFileSmallerThanOneCell2() throws IOException { + testOneFile("/SmallerThanOneCell", cellSize - 1); + } + + @Test + public void testFileEqualsWithOneCell() throws IOException { + testOneFile("/EqualsWithOneCell", cellSize); + } + + @Test + public void testFileSmallerThanOneStripe1() throws IOException { + testOneFile("/SmallerThanOneStripe", cellSize * dataBlocks - 1); + } + + @Test + public void testFileSmallerThanOneStripe2() throws IOException { + testOneFile("/SmallerThanOneStripe", cellSize + 123); + } + + @Test + public void testFileEqualsWithOneStripe() throws IOException { + testOneFile("/EqualsWithOneStripe", cellSize * dataBlocks); + } + + @Test + public void testFileMoreThanOneStripe1() throws IOException { + testOneFile("/MoreThanOneStripe1", cellSize * dataBlocks + 123); + } + + @Test + public void testFileMoreThanOneStripe2() throws IOException { + testOneFile("/MoreThanOneStripe2", cellSize * dataBlocks + + cellSize * dataBlocks + 123); + } + + @Test + public void testFileLessThanFullBlockGroup() throws IOException { + testOneFile("/LessThanFullBlockGroup", + cellSize * dataBlocks * (stripesPerBlock - 1) + cellSize); + } + + @Test + public void testFileFullBlockGroup() throws IOException { + testOneFile("/FullBlockGroup", blockSize * dataBlocks); + } + + @Test + public void testFileMoreThanABlockGroup1() throws IOException { + testOneFile("/MoreThanABlockGroup1", blockSize * dataBlocks + 123); + } + + @Test + public void testFileMoreThanABlockGroup2() throws IOException { + testOneFile("/MoreThanABlockGroup2", + blockSize * dataBlocks + cellSize+ 123); + } + + + @Test + public void testFileMoreThanABlockGroup3() throws IOException { + testOneFile("/MoreThanABlockGroup3", + blockSize * dataBlocks * 3 + cellSize * dataBlocks + + cellSize + 123); + } + + private byte[] generateBytes(int cnt) { + byte[] bytes = new byte[cnt]; + for (int i = 0; i < cnt; i++) { + bytes[i] = getByte(i); + } + return bytes; + } + + private byte getByte(long pos) { + int mod = 29; + return (byte) (pos % mod + 1); + } + + private void testOneFile(String src, int writeBytes) throws IOException { + Path testPath = new Path(src); + + byte[] bytes = generateBytes(writeBytes); + DFSTestUtil.writeFile(fs, testPath, new String(bytes)); + + // check file length + FileStatus status = fs.getFileStatus(testPath); + Assert.assertEquals(writeBytes, status.getLen()); + + checkData(src, writeBytes); + } + + void checkData(String src, int writeBytes) throws IOException { + List> blockGroupList = new ArrayList<>(); + LocatedBlocks lbs = fs.getClient().getLocatedBlocks(src, 0L); + + for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) { + Assert.assertTrue(firstBlock instanceof LocatedStripedBlock); + LocatedBlock[] blocks = StripedBlockUtil. + parseStripedBlockGroup((LocatedStripedBlock) firstBlock, + cellSize, dataBlocks, parityBlocks); + List oneGroup = Arrays.asList(blocks); + blockGroupList.add(oneGroup); + } + + // test each block group + for (int group = 0; group < blockGroupList.size(); group++) { + //get the data of this block + List blockList = blockGroupList.get(group); + byte[][] dataBlockBytes = new byte[dataBlocks][]; + byte[][] parityBlockBytes = new byte[parityBlocks][]; + + // for each block, use BlockReader to read data + for (int i = 0; i < blockList.size(); i++) { + LocatedBlock lblock = blockList.get(i); + if (lblock == null) { + continue; + } + ExtendedBlock block = lblock.getBlock(); + byte[] blockBytes = new byte[(int)block.getNumBytes()]; + if (i < dataBlocks) { + dataBlockBytes[i] = blockBytes; + } else { + parityBlockBytes[i - dataBlocks] = blockBytes; + } + + if (block.getNumBytes() == 0) { + continue; + } + + final BlockReader blockReader = BlockReaderTestUtil.getBlockReader( + fs, lblock, 0, block.getNumBytes()); + blockReader.readAll(blockBytes, 0, (int) block.getNumBytes()); + blockReader.close(); + } + + // check if we write the data correctly + for (int blkIdxInGroup = 0; blkIdxInGroup < dataBlockBytes.length; + blkIdxInGroup++) { + final byte[] actualBlkBytes = dataBlockBytes[blkIdxInGroup]; + if (actualBlkBytes == null) { + continue; + } + for (int posInBlk = 0; posInBlk < actualBlkBytes.length; posInBlk++) { + // calculate the position of this byte in the file + long posInFile = StripedBlockUtil.offsetInBlkToOffsetInBG(cellSize, + dataBlocks, posInBlk, blkIdxInGroup) + + group * blockSize * dataBlocks; + Assert.assertTrue(posInFile < writeBytes); + final byte expected = getByte(posInFile); + + String s = "Unexpected byte " + actualBlkBytes[posInBlk] + + ", expect " + expected + + ". Block group index is " + group + + ", stripe index is " + posInBlk / cellSize + + ", cell index is " + blkIdxInGroup + + ", byte index is " + posInBlk % cellSize; + Assert.assertEquals(s, expected, actualBlkBytes[posInBlk]); + } + } + + verifyParity(lbs.getLocatedBlocks().get(group).getBlockSize(), + cellSize, dataBlockBytes, parityBlockBytes); + } + } + + void verifyParity(final long size, final int cellSize, + byte[][] dataBytes, byte[][] parityBytes) { + verifyParity(conf, size, cellSize, dataBytes, parityBytes, -1); + } + + static void verifyParity(Configuration conf, final long size, + final int cellSize, byte[][] dataBytes, + byte[][] parityBytes, int killedDnIndex) { + // verify the parity blocks + int parityBlkSize = (int) StripedBlockUtil.getInternalBlockLength( + size, cellSize, dataBytes.length, dataBytes.length); + final byte[][] expectedParityBytes = new byte[parityBytes.length][]; + for (int i = 0; i < parityBytes.length; i++) { + expectedParityBytes[i] = new byte[parityBlkSize]; + } + for (int i = 0; i < dataBytes.length; i++) { + if (dataBytes[i] == null) { + dataBytes[i] = new byte[dataBytes[0].length]; + } else if (dataBytes[i].length < dataBytes[0].length) { + final byte[] tmp = dataBytes[i]; + dataBytes[i] = new byte[dataBytes[0].length]; + System.arraycopy(tmp, 0, dataBytes[i], 0, tmp.length); + } + } + final RawErasureEncoder encoder = + CodecUtil.createRSRawEncoder(conf, + dataBytes.length, parityBytes.length); + encoder.encode(dataBytes, expectedParityBytes); + for (int i = 0; i < parityBytes.length; i++) { + if (i != killedDnIndex) { + Assert.assertArrayEquals("i=" + i + ", killedDnIndex=" + killedDnIndex, + expectedParityBytes[i], parityBytes[i]); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java new file mode 100644 index 0000000..6594ae1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java @@ -0,0 +1,329 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; +import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.log4j.Level; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.base.Preconditions; + +public class TestDFSStripedOutputStreamWithFailure { + public static final Log LOG = LogFactory.getLog( + TestDFSStripedOutputStreamWithFailure.class); + static { + GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.ALL); + GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.ALL); + } + + private static final int NUM_DATA_BLOCKS = HdfsConstants.NUM_DATA_BLOCKS; + private static final int NUM_PARITY_BLOCKS = HdfsConstants.NUM_PARITY_BLOCKS; + private static final int CELL_SIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + private static final int STRIPES_PER_BLOCK = 4; + private static final int BLOCK_SIZE = CELL_SIZE * STRIPES_PER_BLOCK; + private static final int BLOCK_GROUP_SIZE = BLOCK_SIZE * NUM_DATA_BLOCKS; + + private static final int FLUSH_POS + = 9*DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT + 1; + + private MiniDFSCluster cluster; + private DistributedFileSystem dfs; + private final Path dir = new Path("/" + + TestDFSStripedOutputStreamWithFailure.class.getSimpleName()); + + private void setup(Configuration conf) throws IOException { + final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); + cluster.waitActive(); + dfs = cluster.getFileSystem(); + dfs.mkdirs(dir); + dfs.createErasureCodingZone(dir, null, 0); + } + + private void tearDown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + private static byte getByte(long pos) { + return (byte)pos; + } + + private void initConf(Configuration conf){ + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); + } + + private void initConfWithBlockToken(Configuration conf) { + conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); + conf.setInt("ipc.client.connect.max.retries", 0); + // Set short retry timeouts so this test runs faster + conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10); + } + + @Test(timeout=240000) + public void testDatanodeFailure() throws Exception { + final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE); + HdfsConfiguration conf = new HdfsConfiguration(); + initConf(conf); + for (int dn = 0; dn < 9; dn++) { + try { + setup(conf); + cluster.startDataNodes(conf, 1, true, null, null); + cluster.waitActive(); + runTest(new Path(dir, "file" + dn), length, length / 2, dn, false); + } catch (Exception e) { + LOG.error("failed, dn=" + dn + ", length=" + length); + throw e; + } finally { + tearDown(); + } + } + } + + @Test(timeout=240000) + public void testBlockTokenExpired() throws Exception { + final int length = NUM_DATA_BLOCKS * (BLOCK_SIZE - CELL_SIZE); + HdfsConfiguration conf = new HdfsConfiguration(); + initConf(conf); + initConfWithBlockToken(conf); + for (int dn = 0; dn < 9; dn += 2) { + try { + setup(conf); + cluster.startDataNodes(conf, 1, true, null, null); + cluster.waitActive(); + runTest(new Path(dir, "file" + dn), length, length / 2, dn, true); + } catch (Exception e) { + LOG.error("failed, dn=" + dn + ", length=" + length); + throw e; + } finally { + tearDown(); + } + } + } + + private void runTest(final Path p, final int length, final int killPos, + final int dnIndex, final boolean tokenExpire) throws Exception { + LOG.info("p=" + p + ", length=" + length + ", killPos=" + killPos + + ", dnIndex=" + dnIndex); + Preconditions.checkArgument(killPos < length); + Preconditions.checkArgument(killPos > FLUSH_POS); + final String fullPath = p.toString(); + + final NameNode nn = cluster.getNameNode(); + final BlockManager bm = nn.getNamesystem().getBlockManager(); + final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager(); + + if (tokenExpire) { + // set a short token lifetime (1 second) + SecurityTestUtil.setBlockTokenLifetime(sm, 1000L); + } + + final AtomicInteger pos = new AtomicInteger(); + final FSDataOutputStream out = dfs.create(p); + final DFSStripedOutputStream stripedOut + = (DFSStripedOutputStream)out.getWrappedStream(); + + long oldGS = -1; + boolean killed = false; + for(; pos.get() < length; ) { + final int i = pos.getAndIncrement(); + if (i == killPos) { + final long gs = getGenerationStamp(stripedOut); + Assert.assertTrue(oldGS != -1); + Assert.assertEquals(oldGS, gs); + + if (tokenExpire) { + DFSTestUtil.flushInternal(stripedOut); + waitTokenExpires(out); + } + + StripedFileTestUtil.killDatanode(cluster, stripedOut, dnIndex, pos); + killed = true; + } + + write(out, i); + + if (i == FLUSH_POS) { + oldGS = getGenerationStamp(stripedOut); + } + } + out.close(); + Assert.assertTrue(killed); + + // check file length + final FileStatus status = dfs.getFileStatus(p); + Assert.assertEquals(length, status.getLen()); + + checkData(dfs, fullPath, length, dnIndex, oldGS); + } + + static void write(FSDataOutputStream out, int i) throws IOException { + try { + out.write(getByte(i)); + } catch(IOException ioe) { + throw new IOException("Failed at i=" + i, ioe); + } + } + + static long getGenerationStamp(DFSStripedOutputStream out) + throws IOException { + final long gs = DFSTestUtil.flushInternal(out).getGenerationStamp(); + LOG.info("getGenerationStamp returns " + gs); + return gs; + + } + + static void checkData(DistributedFileSystem dfs, String src, int length, + int killedDnIndex, long oldGS) throws IOException { + List> blockGroupList = new ArrayList<>(); + LocatedBlocks lbs = dfs.getClient().getLocatedBlocks(src, 0L); + final int expectedNumGroup = (length - 1)/BLOCK_GROUP_SIZE + 1; + Assert.assertEquals(expectedNumGroup, lbs.getLocatedBlocks().size()); + + for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) { + Assert.assertTrue(firstBlock instanceof LocatedStripedBlock); + + final long gs = firstBlock.getBlock().getGenerationStamp(); + final String s = "gs=" + gs + ", oldGS=" + oldGS; + LOG.info(s); + Assert.assertTrue(s, gs > oldGS); + + LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup( + (LocatedStripedBlock) firstBlock, + CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS); + blockGroupList.add(Arrays.asList(blocks)); + } + + // test each block group + for (int group = 0; group < blockGroupList.size(); group++) { + final boolean isLastGroup = group == blockGroupList.size() - 1; + final int groupSize = !isLastGroup? BLOCK_GROUP_SIZE + : length - (blockGroupList.size() - 1)*BLOCK_GROUP_SIZE; + final int numCellInGroup = (groupSize - 1)/CELL_SIZE + 1; + final int lastCellIndex = (numCellInGroup - 1) % NUM_DATA_BLOCKS; + final int lastCellSize = groupSize - (numCellInGroup - 1)*CELL_SIZE; + + //get the data of this block + List blockList = blockGroupList.get(group); + byte[][] dataBlockBytes = new byte[NUM_DATA_BLOCKS][]; + byte[][] parityBlockBytes = new byte[NUM_PARITY_BLOCKS][]; + + // for each block, use BlockReader to read data + for (int i = 0; i < blockList.size(); i++) { + final int j = i >= NUM_DATA_BLOCKS? 0: i; + final int numCellInBlock = (numCellInGroup - 1)/NUM_DATA_BLOCKS + + (j <= lastCellIndex? 1: 0); + final int blockSize = numCellInBlock*CELL_SIZE + + (isLastGroup && i == lastCellIndex? lastCellSize - CELL_SIZE: 0); + + final byte[] blockBytes = new byte[blockSize]; + if (i < NUM_DATA_BLOCKS) { + dataBlockBytes[i] = blockBytes; + } else { + parityBlockBytes[i - NUM_DATA_BLOCKS] = blockBytes; + } + + final LocatedBlock lb = blockList.get(i); + LOG.info("XXX i=" + i + ", lb=" + lb); + if (lb == null) { + continue; + } + final ExtendedBlock block = lb.getBlock(); + Assert.assertEquals(blockSize, block.getNumBytes()); + + + if (block.getNumBytes() == 0) { + continue; + } + + if (i != killedDnIndex) { + final BlockReader blockReader = BlockReaderTestUtil.getBlockReader( + dfs, lb, 0, block.getNumBytes()); + blockReader.readAll(blockBytes, 0, (int) block.getNumBytes()); + blockReader.close(); + } + } + + // check data + final int groupPosInFile = group*BLOCK_GROUP_SIZE; + for (int i = 0; i < dataBlockBytes.length; i++) { + final byte[] actual = dataBlockBytes[i]; + for (int posInBlk = 0; posInBlk < actual.length; posInBlk++) { + final long posInFile = StripedBlockUtil.offsetInBlkToOffsetInBG( + CELL_SIZE, NUM_DATA_BLOCKS, posInBlk, i) + groupPosInFile; + Assert.assertTrue(posInFile < length); + final byte expected = getByte(posInFile); + + if (i == killedDnIndex) { + actual[posInBlk] = expected; + } else { + String s = "expected=" + expected + " but actual=" + actual[posInBlk] + + ", posInFile=" + posInFile + ", posInBlk=" + posInBlk + + ". group=" + group + ", i=" + i; + Assert.assertEquals(s, expected, actual[posInBlk]); + } + } + } + + // check parity + TestDFSStripedOutputStream.verifyParity(dfs.getConf(), + lbs.getLocatedBlocks().get(group).getBlockSize(), + CELL_SIZE, dataBlockBytes, parityBlockBytes, + killedDnIndex - dataBlockBytes.length); + } + } + + private void waitTokenExpires(FSDataOutputStream out) throws IOException { + Token token = DFSTestUtil.getBlockToken(out); + while (!SecurityTestUtil.isBlockTokenExpired(token)) { + try { + Thread.sleep(10); + } catch (InterruptedException ignored) { + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java index a821c30..4233a1c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java @@ -110,7 +110,7 @@ public class TestDFSUtil { l2.setCorrupt(true); List ls = Arrays.asList(l1, l2); - LocatedBlocks lbs = new LocatedBlocks(10, false, ls, l2, true, null); + LocatedBlocks lbs = new LocatedBlocks(10, false, ls, l2, true, null, null, 0); BlockLocation[] bs = DFSUtilClient.locatedBlocks2Locations(lbs); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestECSchemas.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestECSchemas.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestECSchemas.java new file mode 100644 index 0000000..88198c9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestECSchemas.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import static org.junit.Assert.*; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager; +import org.apache.hadoop.io.erasurecode.ECSchema; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestECSchemas { + private MiniDFSCluster cluster; + + @Before + public void before() throws IOException { + cluster = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(0) + .build(); + cluster.waitActive(); + } + + @After + public void after() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testGetECSchemas() throws Exception { + ECSchema[] ecSchemas = cluster.getFileSystem().getClient().getECSchemas(); + assertNotNull(ecSchemas); + assertTrue("Should have at least one schema", ecSchemas.length > 0); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java index 567a70a..8724ed5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java @@ -743,7 +743,7 @@ public class TestEncryptionZones { version, new byte[suite.getAlgorithmBlockSize()], new byte[suite.getAlgorithmBlockSize()], "fakeKey", "fakeVersion"), - (byte) 0)) + (byte) 0, null, 0)) .when(mcp) .create(anyString(), (FsPermission) anyObject(), anyString(), (EnumSetWritable) anyObject(), anyBoolean(), http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingZones.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingZones.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingZones.java new file mode 100644 index 0000000..1a10ebf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingZones.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.INode; +import org.apache.hadoop.io.erasurecode.ECSchema; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains; +import static org.junit.Assert.*; + +public class TestErasureCodingZones { + private Configuration conf; + private MiniDFSCluster cluster; + private DistributedFileSystem fs; + private static final int BLOCK_SIZE = 1024; + private FSNamesystem namesystem; + + @Before + public void setupCluster() throws IOException { + conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + cluster = new MiniDFSCluster.Builder(conf). + numDataNodes(1).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + namesystem = cluster.getNamesystem(); + } + + @After + public void shutdownCluster() throws IOException { + cluster.shutdown(); + } + + @Test + public void testCreateECZone() + throws IOException, InterruptedException { + final Path testDir = new Path("/ec"); + fs.mkdir(testDir, FsPermission.getDirDefault()); + + /* Normal creation of an erasure coding zone */ + fs.getClient().createErasureCodingZone(testDir.toString(), null, 0); + + /* Verify files under the zone are striped */ + final Path ECFilePath = new Path(testDir, "foo"); + fs.create(ECFilePath); + INode inode = namesystem.getFSDirectory().getINode(ECFilePath.toString()); + assertTrue(inode.asFile().isStriped()); + + /* Verify that EC zone cannot be created on non-empty dir */ + final Path notEmpty = new Path("/nonEmpty"); + fs.mkdir(notEmpty, FsPermission.getDirDefault()); + fs.create(new Path(notEmpty, "foo")); + try { + fs.getClient().createErasureCodingZone(notEmpty.toString(), null, 0); + fail("Erasure coding zone on non-empty dir"); + } catch (IOException e) { + assertExceptionContains("erasure coding zone for a non-empty directory", e); + } + + /* Verify that nested EC zones cannot be created */ + final Path zone1 = new Path("/zone1"); + final Path zone2 = new Path(zone1, "zone2"); + fs.mkdir(zone1, FsPermission.getDirDefault()); + fs.getClient().createErasureCodingZone(zone1.toString(), null, 0); + fs.mkdir(zone2, FsPermission.getDirDefault()); + try { + fs.getClient().createErasureCodingZone(zone2.toString(), null, 0); + fail("Nested erasure coding zones"); + } catch (IOException e) { + assertExceptionContains("already in an erasure coding zone", e); + } + + /* Verify that EC zone cannot be created on a file */ + final Path fPath = new Path("/file"); + fs.create(fPath); + try { + fs.getClient().createErasureCodingZone(fPath.toString(), null, 0); + fail("Erasure coding zone on file"); + } catch (IOException e) { + assertExceptionContains("erasure coding zone for a file", e); + } + } + + @Test + public void testMoveValidity() throws IOException, InterruptedException { + final Path srcECDir = new Path("/srcEC"); + final Path dstECDir = new Path("/dstEC"); + fs.mkdir(srcECDir, FsPermission.getDirDefault()); + fs.mkdir(dstECDir, FsPermission.getDirDefault()); + fs.getClient().createErasureCodingZone(srcECDir.toString(), null, 0); + fs.getClient().createErasureCodingZone(dstECDir.toString(), null, 0); + final Path srcFile = new Path(srcECDir, "foo"); + fs.create(srcFile); + + // Test move dir + // Move EC dir under non-EC dir + final Path newDir = new Path("/srcEC_new"); + fs.rename(srcECDir, newDir); + fs.rename(newDir, srcECDir); // move back + + // Move EC dir under another EC dir + fs.rename(srcECDir, dstECDir); + fs.rename(new Path("/dstEC/srcEC"), srcECDir); // move back + + // Test move file + /* Verify that a file can be moved between 2 EC zones */ + fs.rename(srcFile, dstECDir); + fs.rename(new Path(dstECDir, "foo"), srcECDir); // move back + + /* Verify that a file cannot be moved from a non-EC dir to an EC zone */ + final Path nonECDir = new Path("/nonEC"); + fs.mkdir(nonECDir, FsPermission.getDirDefault()); + try { + fs.rename(srcFile, nonECDir); + fail("A file shouldn't be able to move from a non-EC dir to an EC zone"); + } catch (IOException e) { + assertExceptionContains("can't be moved because the source and " + + "destination have different erasure coding policies", e); + } + + /* Verify that a file cannot be moved from an EC zone to a non-EC dir */ + final Path nonECFile = new Path(nonECDir, "nonECFile"); + fs.create(nonECFile); + try { + fs.rename(nonECFile, dstECDir); + } catch (IOException e) { + assertExceptionContains("can't be moved because the source and " + + "destination have different erasure coding policies", e); + } + } + + @Test + public void testReplication() throws IOException { + final Path testDir = new Path("/ec"); + fs.mkdir(testDir, FsPermission.getDirDefault()); + fs.createErasureCodingZone(testDir, null, 0); + final Path fooFile = new Path(testDir, "foo"); + // create ec file with replication=0 + fs.create(fooFile, FsPermission.getFileDefault(), true, + conf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096), + (short)0, fs.getDefaultBlockSize(fooFile), null); + + try { + fs.setReplication(fooFile, (short) 3); + fail("Shouldn't allow to set replication to a file with striped blocks"); + } catch (IOException e) { + assertExceptionContains( + "Cannot set replication to a file with striped blocks", e); + } + } + + @Test + public void testGetErasureCodingInfoWithSystemDefaultSchema() throws Exception { + String src = "/ec"; + final Path ecDir = new Path(src); + fs.mkdir(ecDir, FsPermission.getDirDefault()); + // dir ECInfo before creating ec zone + assertNull(fs.getClient().getFileInfo(src).getECSchema()); + // dir ECInfo after creating ec zone + fs.getClient().createErasureCodingZone(src, null, 0); //Default one will be used. + ECSchema sysDefaultSchema = ErasureCodingSchemaManager.getSystemDefaultSchema(); + verifyErasureCodingInfo(src, sysDefaultSchema); + fs.create(new Path(ecDir, "child1")).close(); + // verify for the files in ec zone + verifyErasureCodingInfo(src + "/child1", sysDefaultSchema); + } + + @Test + public void testGetErasureCodingInfo() throws Exception { + ECSchema[] sysSchemas = ErasureCodingSchemaManager.getSystemSchemas(); + assertTrue("System schemas should be of only 1 for now", + sysSchemas.length == 1); + + ECSchema usingSchema = sysSchemas[0]; + String src = "/ec2"; + final Path ecDir = new Path(src); + fs.mkdir(ecDir, FsPermission.getDirDefault()); + // dir ECInfo before creating ec zone + assertNull(fs.getClient().getFileInfo(src).getECSchema()); + // dir ECInfo after creating ec zone + fs.getClient().createErasureCodingZone(src, usingSchema, 0); + verifyErasureCodingInfo(src, usingSchema); + fs.create(new Path(ecDir, "child1")).close(); + // verify for the files in ec zone + verifyErasureCodingInfo(src + "/child1", usingSchema); + } + + private void verifyErasureCodingInfo( + String src, ECSchema usingSchema) throws IOException { + HdfsFileStatus hdfsFileStatus = fs.getClient().getFileInfo(src); + ECSchema schema = hdfsFileStatus.getECSchema(); + assertNotNull(schema); + assertEquals("Actually used schema should be equal with target schema", + usingSchema, schema); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatusWithECschema.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatusWithECschema.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatusWithECschema.java new file mode 100644 index 0000000..3c400b7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatusWithECschema.java @@ -0,0 +1,65 @@ +package org.apache.hadoop.hdfs; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager; +import org.apache.hadoop.io.erasurecode.ECSchema; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestFileStatusWithECschema { + private MiniDFSCluster cluster; + private DistributedFileSystem fs; + private DFSClient client; + + @Before + public void before() throws IOException { + cluster = + new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + client = fs.getClient(); + } + + @After + public void after() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testFileStatusWithECschema() throws Exception { + // test directory not in EC zone + final Path dir = new Path("/foo"); + assertTrue(fs.mkdir(dir, FsPermission.getDirDefault())); + assertNull(client.getFileInfo(dir.toString()).getECSchema()); + // test file not in EC zone + final Path file = new Path(dir, "foo"); + fs.create(file).close(); + assertNull(client.getFileInfo(file.toString()).getECSchema()); + fs.delete(file, true); + + final ECSchema schema1 = ErasureCodingSchemaManager.getSystemDefaultSchema(); + // create EC zone on dir + fs.createErasureCodingZone(dir, schema1, 0); + final ECSchema schame2 = client.getFileInfo(dir.toUri().getPath()).getECSchema(); + assertNotNull(schame2); + assertTrue(schema1.equals(schame2)); + + // test file in EC zone + fs.create(file).close(); + final ECSchema schame3 = + fs.getClient().getFileInfo(file.toUri().getPath()).getECSchema(); + assertNotNull(schame3); + assertTrue(schema1.equals(schame3)); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java index 985f43e..d0cd335 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLease.java @@ -354,12 +354,12 @@ public class TestLease { Mockito.doReturn( new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission( (short) 777), "owner", "group", new byte[0], new byte[0], - 1010, 0, null, (byte) 0)).when(mcp).getFileInfo(anyString()); + 1010, 0, null, (byte) 0, null, 0)).when(mcp).getFileInfo(anyString()); Mockito .doReturn( new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission( (short) 777), "owner", "group", new byte[0], new byte[0], - 1010, 0, null, (byte) 0)) + 1010, 0, null, (byte) 0, null, 0)) .when(mcp) .create(anyString(), (FsPermission) anyObject(), anyString(), (EnumSetWritable) anyObject(), anyBoolean(), http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java new file mode 100644 index 0000000..1719d3f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java @@ -0,0 +1,358 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.cellSize; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.dataBlocks; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.parityBlocks; + +public class TestReadStripedFileWithDecoding { + static final Log LOG = LogFactory.getLog(TestReadStripedFileWithDecoding.class); + + private MiniDFSCluster cluster; + private DistributedFileSystem fs; + private final int smallFileLength = blockSize * dataBlocks - 123; + private final int largeFileLength = blockSize * dataBlocks + 123; + private final int[] fileLengths = {smallFileLength, largeFileLength}; + private final int[] dnFailureNums = {1, 2, 3}; + + @Before + public void setup() throws IOException { + cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()) + .numDataNodes(numDNs).build(); + cluster.getFileSystem().getClient().createErasureCodingZone("/", + null, cellSize); + fs = cluster.getFileSystem(); + } + + @After + public void tearDown() throws IOException { + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * Shutdown tolerable number of Datanode before reading. + * Verify the decoding works correctly. + */ + @Test(timeout=300000) + public void testReadWithDNFailure() throws IOException { + for (int fileLength : fileLengths) { + for (int dnFailureNum : dnFailureNums) { + try { + // setup a new cluster with no dead datanode + setup(); + testReadWithDNFailure(fileLength, dnFailureNum); + } catch (IOException ioe) { + String fileType = fileLength < (blockSize * dataBlocks) ? + "smallFile" : "largeFile"; + LOG.error("Failed to read file with DN failure:" + + " fileType = "+ fileType + + ", dnFailureNum = " + dnFailureNum); + } finally { + // tear down the cluster + tearDown(); + } + } + } + } + + /** + * Corrupt tolerable number of block before reading. + * Verify the decoding works correctly. + */ + @Test(timeout=300000) + public void testReadCorruptedData() throws IOException { + for (int fileLength : fileLengths) { + for (int dataDelNum = 1; dataDelNum < 4; dataDelNum++) { + for (int parityDelNum = 0; (dataDelNum+parityDelNum) < 4; parityDelNum++) { + String src = "/corrupted_" + dataDelNum + "_" + parityDelNum; + testReadWithBlockCorrupted(src, fileLength, + dataDelNum, parityDelNum, false); + } + } + } + } + + /** + * Delete tolerable number of block before reading. + * Verify the decoding works correctly. + */ + @Test(timeout=300000) + public void testReadCorruptedDataByDeleting() throws IOException { + for (int fileLength : fileLengths) { + for (int dataDelNum = 1; dataDelNum < 4; dataDelNum++) { + for (int parityDelNum = 0; (dataDelNum+parityDelNum) < 4; parityDelNum++) { + String src = "/deleted_" + dataDelNum + "_" + parityDelNum; + testReadWithBlockCorrupted(src, fileLength, + dataDelNum, parityDelNum, true); + } + } + } + } + + private int findFirstDataNode(Path file, long length) throws IOException { + BlockLocation[] locs = fs.getFileBlockLocations(file, 0, length); + String name = (locs[0].getNames())[0]; + int dnIndex = 0; + for (DataNode dn : cluster.getDataNodes()) { + int port = dn.getXferPort(); + if (name.contains(Integer.toString(port))) { + return dnIndex; + } + dnIndex++; + } + return -1; + } + + private void verifyRead(Path testPath, int length, byte[] expected) + throws IOException { + byte[] buffer = new byte[length + 100]; + StripedFileTestUtil.verifyLength(fs, testPath, length); + StripedFileTestUtil.verifyPread(fs, testPath, length, expected, buffer); + StripedFileTestUtil.verifyStatefulRead(fs, testPath, length, expected, buffer); + StripedFileTestUtil.verifyStatefulRead(fs, testPath, length, expected, + ByteBuffer.allocate(length + 100)); + StripedFileTestUtil.verifySeek(fs, testPath, length); + } + + private void testReadWithDNFailure(int fileLength, int dnFailureNum) + throws IOException { + String fileType = fileLength < (blockSize * dataBlocks) ? + "smallFile" : "largeFile"; + String src = "/dnFailure_" + dnFailureNum + "_" + fileType; + LOG.info("testReadWithDNFailure: file = " + src + + ", fileSize = " + fileLength + + ", dnFailureNum = " + dnFailureNum); + + Path testPath = new Path(src); + final byte[] bytes = StripedFileTestUtil.generateBytes(fileLength); + DFSTestUtil.writeFile(fs, testPath, bytes); + + // shut down the DN that holds an internal data block + BlockLocation[] locs = fs.getFileBlockLocations(testPath, cellSize * 5, + cellSize); + for (int failedDnIdx = 0; failedDnIdx < dnFailureNum; failedDnIdx++) { + String name = (locs[0].getNames())[failedDnIdx]; + for (DataNode dn : cluster.getDataNodes()) { + int port = dn.getXferPort(); + if (name.contains(Integer.toString(port))) { + dn.shutdown(); + } + } + } + + // check file length, pread, stateful read and seek + verifyRead(testPath, fileLength, bytes); + } + + /** + * After reading a corrupted block, make sure the client can correctly report + * the corruption to the NameNode. + */ + @Test + public void testReportBadBlock() throws IOException { + // create file + final Path file = new Path("/corrupted"); + final int length = 10; // length of "corruption" + final byte[] bytes = StripedFileTestUtil.generateBytes(length); + DFSTestUtil.writeFile(fs, file, bytes); + + // corrupt the first data block + int dnIndex = findFirstDataNode(file, cellSize * dataBlocks); + Assert.assertNotEquals(-1, dnIndex); + LocatedStripedBlock slb = (LocatedStripedBlock)fs.getClient() + .getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0); + final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb, + cellSize, dataBlocks, parityBlocks); + // find the first block file + File storageDir = cluster.getInstanceStorageDir(dnIndex, 0); + File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock()); + Assert.assertTrue("Block file does not exist", blkFile.exists()); + // corrupt the block file + LOG.info("Deliberately corrupting file " + blkFile.getName()); + try (FileOutputStream out = new FileOutputStream(blkFile)) { + out.write("corruption".getBytes()); + } + + // disable the heartbeat from DN so that the corrupted block record is kept + // in NameNode + for (DataNode dn : cluster.getDataNodes()) { + DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true); + } + + try { + // do stateful read + StripedFileTestUtil.verifyStatefulRead(fs, file, length, bytes, + ByteBuffer.allocate(1024)); + + // check whether the corruption has been reported to the NameNode + final FSNamesystem ns = cluster.getNamesystem(); + final BlockManager bm = ns.getBlockManager(); + BlockInfo blockInfo = (ns.getFSDirectory().getINode4Write(file.toString()) + .asFile().getBlocks())[0]; + Assert.assertEquals(1, bm.getCorruptReplicas(blockInfo).size()); + } finally { + for (DataNode dn : cluster.getDataNodes()) { + DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false); + } + } + } + + @Test + public void testInvalidateBlock() throws IOException { + final Path file = new Path("/invalidate"); + final int length = 10; + final byte[] bytes = StripedFileTestUtil.generateBytes(length); + DFSTestUtil.writeFile(fs, file, bytes); + + int dnIndex = findFirstDataNode(file, cellSize * dataBlocks); + Assert.assertNotEquals(-1, dnIndex); + LocatedStripedBlock slb = (LocatedStripedBlock)fs.getClient() + .getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0); + final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb, + cellSize, dataBlocks, parityBlocks); + final Block b = blks[0].getBlock().getLocalBlock(); + + DataNode dn = cluster.getDataNodes().get(dnIndex); + // disable the heartbeat from DN so that the invalidated block record is kept + // in NameNode until heartbeat expires and NN mark the dn as dead + DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true); + + try { + // delete the file + fs.delete(file, true); + // check the block is added to invalidateBlocks + final FSNamesystem fsn = cluster.getNamesystem(); + final BlockManager bm = fsn.getBlockManager(); + DatanodeDescriptor dnd = NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId()); + Assert.assertTrue(bm.containsInvalidateBlock( + blks[0].getLocations()[0], b) || dnd.containsInvalidateBlock(b)); + } finally { + DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false); + } + } + + /** + * Test reading a file with some blocks(data blocks or parity blocks or both) + * deleted or corrupted. + * @param src file path + * @param fileLength file length + * @param dataBlkDelNum the deleted or corrupted number of data blocks. + * @param parityBlkDelNum the deleted or corrupted number of parity blocks. + * @param deleteBlockFile whether block file is deleted or corrupted. + * true is to delete the block file. + * false is to corrupt the content of the block file. + * @throws IOException + */ + private void testReadWithBlockCorrupted(String src, int fileLength, + int dataBlkDelNum, int parityBlkDelNum, boolean deleteBlockFile) + throws IOException { + LOG.info("testReadWithBlockCorrupted: file = " + src + + ", dataBlkDelNum = " + dataBlkDelNum + + ", parityBlkDelNum = " + parityBlkDelNum + + ", deleteBlockFile? " + deleteBlockFile); + int recoverBlkNum = dataBlkDelNum + parityBlkDelNum; + Assert.assertTrue("dataBlkDelNum and parityBlkDelNum should be positive", + dataBlkDelNum >= 0 && parityBlkDelNum >= 0); + Assert.assertTrue("The sum of dataBlkDelNum and parityBlkDelNum " + + "should be between 1 ~ " + parityBlocks, recoverBlkNum <= parityBlocks); + + // write a file with the length of writeLen + Path srcPath = new Path(src); + final byte[] bytes = StripedFileTestUtil.generateBytes(fileLength); + DFSTestUtil.writeFile(fs, srcPath, bytes); + + // delete or corrupt some blocks + corruptBlocks(srcPath, dataBlkDelNum, parityBlkDelNum, deleteBlockFile); + + // check the file can be read after some blocks were deleted + verifyRead(srcPath, fileLength, bytes); + } + + private void corruptBlocks(Path srcPath, int dataBlkDelNum, + int parityBlkDelNum, boolean deleteBlockFile) throws IOException { + int recoverBlkNum = dataBlkDelNum + parityBlkDelNum; + + LocatedBlocks locatedBlocks = getLocatedBlocks(srcPath); + LocatedStripedBlock lastBlock = + (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock(); + + int[] delDataBlkIndices = StripedFileTestUtil.randomArray(0, dataBlocks, + dataBlkDelNum); + Assert.assertNotNull(delDataBlkIndices); + int[] delParityBlkIndices = StripedFileTestUtil.randomArray(dataBlocks, + dataBlocks + parityBlocks, parityBlkDelNum); + Assert.assertNotNull(delParityBlkIndices); + + int[] delBlkIndices = new int[recoverBlkNum]; + System.arraycopy(delDataBlkIndices, 0, + delBlkIndices, 0, delDataBlkIndices.length); + System.arraycopy(delParityBlkIndices, 0, + delBlkIndices, delDataBlkIndices.length, delParityBlkIndices.length); + + ExtendedBlock[] delBlocks = new ExtendedBlock[recoverBlkNum]; + for (int i = 0; i < recoverBlkNum; i++) { + delBlocks[i] = StripedBlockUtil + .constructInternalBlock(lastBlock.getBlock(), + cellSize, dataBlocks, delBlkIndices[i]); + if (deleteBlockFile) { + // delete the block file + cluster.corruptBlockOnDataNodesByDeletingBlockFile(delBlocks[i]); + } else { + // corrupt the block file + cluster.corruptBlockOnDataNodes(delBlocks[i]); + } + } + } + + private LocatedBlocks getLocatedBlocks(Path filePath) throws IOException { + return fs.getClient().getLocatedBlocks(filePath.toString(), + 0, Long.MAX_VALUE); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5323a659/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithMissingBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithMissingBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithMissingBlocks.java new file mode 100644 index 0000000..4c2438d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithMissingBlocks.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.cellSize; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.dataBlocks; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs; + +/** + * Test reading a striped file when some of its blocks are missing (not included + * in the block locations returned by the NameNode). + */ +public class TestReadStripedFileWithMissingBlocks { + public static final Log LOG = LogFactory + .getLog(TestReadStripedFileWithMissingBlocks.class); + private static MiniDFSCluster cluster; + private static FileSystem fs; + private static Configuration conf = new HdfsConfiguration(); + private final int fileLength = blockSize * dataBlocks + 123; + + @Before + public void setup() throws IOException { + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); + cluster.getFileSystem().getClient().createErasureCodingZone("/", + null, cellSize); + fs = cluster.getFileSystem(); + } + + @After + public void tearDown() throws IOException { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testReadFileWithMissingBlocks1() throws IOException { + readFileWithMissingBlocks(new Path("/foo"), fileLength, 1, 0); + } + + @Test + public void testReadFileWithMissingBlocks2() throws IOException { + readFileWithMissingBlocks(new Path("/foo"), fileLength, 1, 1); + } + + @Test + public void testReadFileWithMissingBlocks3() throws IOException { + readFileWithMissingBlocks(new Path("/foo"), fileLength, 1, 2); + } + + @Test + public void testReadFileWithMissingBlocks4() throws IOException { + readFileWithMissingBlocks(new Path("/foo"), fileLength, 2, 0); + } + + @Test + public void testReadFileWithMissingBlocks5() throws IOException { + readFileWithMissingBlocks(new Path("/foo"), fileLength, 2, 1); + } + + @Test + public void testReadFileWithMissingBlocks6() throws IOException { + readFileWithMissingBlocks(new Path("/foo"), fileLength, 3, 0); + } + + private void readFileWithMissingBlocks(Path srcPath, int fileLength, + int missingDataNum, int missingParityNum) + throws IOException { + LOG.info("readFileWithMissingBlocks: (" + missingDataNum + "," + + missingParityNum + ")"); + final byte[] expected = StripedFileTestUtil.generateBytes(fileLength); + DFSTestUtil.writeFile(fs, srcPath, new String(expected)); + StripedFileTestUtil.verifyLength(fs, srcPath, fileLength); + int dataBlocks = (fileLength - 1) / cellSize + 1; + BlockLocation[] locs = fs.getFileBlockLocations(srcPath, 0, cellSize); + + int[] missingDataNodes = new int[missingDataNum + missingParityNum]; + for (int i = 0; i < missingDataNum; i++) { + missingDataNodes[i] = i; + } + for (int i = 0; i < missingParityNum; i++) { + missingDataNodes[i + missingDataNum] = i + + Math.min(StripedFileTestUtil.dataBlocks, dataBlocks); + } + stopDataNodes(locs, missingDataNodes); + + // make sure there are missing block locations + BlockLocation[] newLocs = fs.getFileBlockLocations(srcPath, 0, cellSize); + Assert.assertTrue(newLocs[0].getNames().length < locs[0].getNames().length); + + byte[] smallBuf = new byte[1024]; + byte[] largeBuf = new byte[fileLength + 100]; + StripedFileTestUtil.verifySeek(fs, srcPath, fileLength); + StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected, + smallBuf); + StripedFileTestUtil.verifyPread(fs, srcPath, fileLength, expected, largeBuf); + + // delete the file + fs.delete(srcPath, true); + } + + private void stopDataNodes(BlockLocation[] locs, int[] datanodes) + throws IOException { + if (locs != null && locs.length > 0) { + for (int failedDNIdx : datanodes) { + String name = (locs[0].getNames())[failedDNIdx]; + for (DataNode dn : cluster.getDataNodes()) { + int port = dn.getXferPort(); + if (name.contains(Integer.toString(port))) { + dn.shutdown(); + cluster.setDataNodeDead(dn.getDatanodeId()); + LOG.info("stop datanode " + failedDNIdx); + break; + } + } + } + } + } +}