Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0874118AF3 for ; Fri, 14 Aug 2015 22:16:42 +0000 (UTC) Received: (qmail 32077 invoked by uid 500); 14 Aug 2015 22:16:41 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 32016 invoked by uid 500); 14 Aug 2015 22:16:41 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 32007 invoked by uid 99); 14 Aug 2015 22:16:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Aug 2015 22:16:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8A76CE04C9; Fri, 14 Aug 2015 22:16:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhz@apache.org To: common-commits@hadoop.apache.org Message-Id: <7c33db504c0c4d3e9a6ef10bd11bae8c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: HDFS-8220. Erasure Coding: StripedDataStreamer fails to handle the blocklocations which doesn't satisfy BlockGroupSize. Date: Fri, 14 Aug 2015 22:16:41 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/HDFS-7285-merge acbe42a85 -> 5c31bf7a1 HDFS-8220. Erasure Coding: StripedDataStreamer fails to handle the blocklocations which doesn't satisfy BlockGroupSize. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5c31bf7a Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5c31bf7a Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5c31bf7a Branch: refs/heads/HDFS-7285-merge Commit: 5c31bf7a1c2316857f312b41a207e22b36e0f38d Parents: acbe42a Author: Zhe Zhang Authored: Fri Aug 14 15:16:22 2015 -0700 Committer: Zhe Zhang Committed: Fri Aug 14 15:16:22 2015 -0700 ---------------------------------------------------------------------- .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 + .../apache/hadoop/hdfs/StripedDataStreamer.java | 42 +++++++--- .../TestDFSStripedOutputStreamWithFailure.java | 84 ++++++++++++++++++++ 3 files changed, 120 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c31bf7a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index 173bf9b..f1fb7a9 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -397,3 +397,6 @@ HDFS-8854. Erasure coding: add ECPolicy to replace schema+cellSize in hadoop-hdfs. (Walter Su via zhz) + + HDFS-8220. Erasure Coding: StripedDataStreamer fails to handle the + blocklocations which doesn't satisfy BlockGroupSize. (Rakesh R via zhz) http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c31bf7a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java index 2d51dc4..f533bf9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.DFSStripedOutputStream.Coordinator; import org.apache.hadoop.hdfs.DFSStripedOutputStream.MultipleBlockingQueue; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; @@ -167,18 +168,33 @@ public class StripedDataStreamer extends DataStreamer { final LocatedBlock lb = StripedDataStreamer.super.locateFollowingBlock( excludedNodes); + if (lb.getLocations().length < HdfsConstants.NUM_DATA_BLOCKS) { + throw new IOException( + "Failed to get datablocks number of nodes from namenode: blockGroupSize= " + + (HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS) + + ", blocks.length= " + lb.getLocations().length); + } final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup( (LocatedStripedBlock)lb, BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS); for (int i = 0; i < blocks.length; i++) { - if (!coordinator.getStripedDataStreamer(i).isFailed()) { - if (blocks[i] == null) { - getLastException().set( - new IOException("Failed to get following block, i=" + i)); - } else { - followingBlocks.offer(i, blocks[i]); - } + StripedDataStreamer si = coordinator.getStripedDataStreamer(i); + if (si.isFailed()) { + continue; // skipping failed data streamer + } + if (blocks[i] == null) { + // Set exception and close streamer as there is no block locations + // found for the parity block. + LOG.warn("Failed to get block location for parity block, index=" + + i); + si.getLastException().set( + new IOException("Failed to get following block, i=" + i)); + si.setFailed(true); + si.endBlock(); + si.close(true); + } else { + followingBlocks.offer(i, blocks[i]); } } } @@ -199,7 +215,11 @@ public class StripedDataStreamer extends DataStreamer { .parseStripedBlockGroup((LocatedStripedBlock) updated, BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS); for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) { - final ExtendedBlock bi = coordinator.getStripedDataStreamer(i).getBlock(); + StripedDataStreamer si = coordinator.getStripedDataStreamer(i); + if (si.isFailed()) { + continue; // skipping failed data streamer + } + final ExtendedBlock bi = si.getBlock(); if (bi != null) { final LocatedBlock lb = new LocatedBlock(newBlock(bi, newGS), null, null, null, -1, updated.isCorrupt(), null); @@ -225,7 +245,11 @@ public class StripedDataStreamer extends DataStreamer { final ExtendedBlock newBG = newBlock(bg, newGS); final ExtendedBlock updated = callUpdatePipeline(bg, newBG); for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) { - final ExtendedBlock bi = coordinator.getStripedDataStreamer(i).getBlock(); + StripedDataStreamer si = coordinator.getStripedDataStreamer(i); + if (si.isFailed()) { + continue; // skipping failed data streamer + } + final ExtendedBlock bi = si.getBlock(); updateBlocks.offer(i, newBlock(bi, updated.getGenerationStamp())); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c31bf7a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java index fed9f16..f65d0c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs; +import static org.junit.Assert.assertEquals; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -33,6 +35,7 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; @@ -40,6 +43,7 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.security.token.Token; @@ -145,6 +149,86 @@ public class TestDFSStripedOutputStreamWithFailure { } } + @Test(timeout = 90000) + public void testAddBlockWhenNoSufficientDataBlockNumOfNodes() + throws IOException { + HdfsConfiguration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + try { + setup(conf); + ArrayList dataNodes = cluster.getDataNodes(); + // shutdown few datanodes to avoid getting sufficient data blocks number + // of datanodes + int killDns = dataNodes.size() / 2; + int numDatanodes = dataNodes.size() - killDns; + for (int i = 0; i < killDns; i++) { + cluster.stopDataNode(i); + } + cluster.restartNameNodes(); + cluster.triggerHeartbeats(); + DatanodeInfo[] info = dfs.getClient().datanodeReport( + DatanodeReportType.LIVE); + assertEquals("Mismatches number of live Dns ", numDatanodes, info.length); + final Path dirFile = new Path(dir, "ecfile"); + FSDataOutputStream out = null; + try { + out = dfs.create(dirFile, true); + out.write("something".getBytes()); + out.flush(); + out.close(); + Assert.fail("Failed to validate available dns against blkGroupSize"); + } catch (IOException ioe) { + // expected + GenericTestUtils.assertExceptionContains("Failed: the number of " + + "remaining blocks = 5 < the number of data blocks = 6", ioe); + DFSStripedOutputStream dfsout = (DFSStripedOutputStream) out + .getWrappedStream(); + + // get leading streamer and verify the last exception + StripedDataStreamer datastreamer = dfsout.getStripedDataStreamer(0); + try { + datastreamer.getLastException().check(true); + Assert.fail("Failed to validate available dns against blkGroupSize"); + } catch (IOException le) { + GenericTestUtils.assertExceptionContains( + "Failed to get datablocks number of nodes from" + + " namenode: blockGroupSize= 9, blocks.length= " + + numDatanodes, le); + } + } + } finally { + tearDown(); + } + } + + @Test(timeout = 90000) + public void testAddBlockWhenNoSufficientParityNumOfNodes() throws IOException { + HdfsConfiguration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + try { + setup(conf); + ArrayList dataNodes = cluster.getDataNodes(); + // shutdown few data nodes to avoid writing parity blocks + int killDns = (NUM_PARITY_BLOCKS - 1); + int numDatanodes = dataNodes.size() - killDns; + for (int i = 0; i < killDns; i++) { + cluster.stopDataNode(i); + } + cluster.restartNameNodes(); + cluster.triggerHeartbeats(); + DatanodeInfo[] info = dfs.getClient().datanodeReport( + DatanodeReportType.LIVE); + assertEquals("Mismatches number of live Dns ", numDatanodes, info.length); + Path srcPath = new Path(dir, "testAddBlockWhenNoSufficientParityNodes"); + int fileLength = HdfsConstants.BLOCK_STRIPED_CELL_SIZE - 1000; + final byte[] expected = StripedFileTestUtil.generateBytes(fileLength); + DFSTestUtil.writeFile(dfs, srcPath, new String(expected)); + StripedFileTestUtil.verifySeek(dfs, srcPath, fileLength); + } finally { + tearDown(); + } + } + private void runTest(final Path p, final int length, final int killPos, final int dnIndex, final boolean tokenExpire) throws Exception { LOG.info("p=" + p + ", length=" + length + ", killPos=" + killPos