Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 257DA200B40 for ; Wed, 1 Jun 2016 21:55:23 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 24066160A4C; Wed, 1 Jun 2016 19:55:23 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4EAAD160A4D for ; Wed, 1 Jun 2016 21:55:21 +0200 (CEST) Received: (qmail 72304 invoked by uid 500); 1 Jun 2016 19:55:04 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 69891 invoked by uid 99); 1 Jun 2016 19:55:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 Jun 2016 19:55:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 10EBCE9675; Wed, 1 Jun 2016 19:55:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aengineer@apache.org To: common-commits@hadoop.apache.org Date: Wed, 01 Jun 2016 19:55:40 -0000 Message-Id: <5fef8af82087454ba6bc8b341fbce6b9@git.apache.org> In-Reply-To: <8bf279bf59c84e0e9ac3691a144cadc2@git.apache.org> References: <8bf279bf59c84e0e9ac3691a144cadc2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [39/39] hadoop git commit: HDFS-9833. Erasure coding: recomputing block checksum on the fly by reconstructing the missed/corrupt block data. Contributed by Rakesh R. archived-at: Wed, 01 Jun 2016 19:55:23 -0000 HDFS-9833. Erasure coding: recomputing block checksum on the fly by reconstructing the missed/corrupt block data. Contributed by Rakesh R. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d749cf65 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d749cf65 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d749cf65 Branch: refs/heads/HDFS-1312 Commit: d749cf65e1ab0e0daf5be86931507183f189e855 Parents: 8ceb06e Author: Kai Zheng Authored: Thu Jun 2 12:56:21 2016 +0800 Committer: Kai Zheng Committed: Thu Jun 2 12:56:21 2016 +0800 ---------------------------------------------------------------------- .../apache/hadoop/hdfs/FileChecksumHelper.java | 3 +- .../hadoop/hdfs/protocol/StripedBlockInfo.java | 10 +- .../hdfs/protocol/datatransfer/Sender.java | 2 + .../hadoop/hdfs/protocolPB/PBHelperClient.java | 16 ++ .../src/main/proto/datatransfer.proto | 1 + .../hdfs/protocol/datatransfer/Receiver.java | 1 + .../server/datanode/BlockChecksumHelper.java | 172 ++++++++++++++----- .../erasurecode/ErasureCodingWorker.java | 15 +- .../StripedBlockChecksumReconstructor.java | 129 ++++++++++++++ .../erasurecode/StripedBlockReconstructor.java | 119 +++++++++++++ .../datanode/erasurecode/StripedReader.java | 22 +-- .../erasurecode/StripedReconstructionInfo.java | 99 +++++++++++ .../erasurecode/StripedReconstructor.java | 169 +++++++----------- .../datanode/erasurecode/StripedWriter.java | 29 ++-- .../hdfs/TestDecommissionWithStriped.java | 47 +++++ .../apache/hadoop/hdfs/TestFileChecksum.java | 41 ++++- 16 files changed, 675 insertions(+), 200 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java index dfd9393..c213fa3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java @@ -460,7 +460,8 @@ final class FileChecksumHelper { setRemaining(getRemaining() - block.getNumBytes()); StripedBlockInfo stripedBlockInfo = new StripedBlockInfo(block, - blockGroup.getLocations(), blockGroup.getBlockTokens(), ecPolicy); + blockGroup.getLocations(), blockGroup.getBlockTokens(), + blockGroup.getBlockIndices(), ecPolicy); DatanodeInfo[] datanodes = blockGroup.getLocations(); //try each datanode in the block group. http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/StripedBlockInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/StripedBlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/StripedBlockInfo.java index 74e8081..e46fabc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/StripedBlockInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/StripedBlockInfo.java @@ -32,14 +32,16 @@ public class StripedBlockInfo { private final ExtendedBlock block; private final DatanodeInfo[] datanodes; private final Token[] blockTokens; + private final byte[] blockIndices; private final ErasureCodingPolicy ecPolicy; public StripedBlockInfo(ExtendedBlock block, DatanodeInfo[] datanodes, - Token[] blockTokens, - ErasureCodingPolicy ecPolicy) { + Token[] blockTokens, byte[] blockIndices, + ErasureCodingPolicy ecPolicy) { this.block = block; this.datanodes = datanodes; this.blockTokens = blockTokens; + this.blockIndices = blockIndices; this.ecPolicy = ecPolicy; } @@ -55,6 +57,10 @@ public class StripedBlockInfo { return blockTokens; } + public byte[] getBlockIndices() { + return blockIndices; + } + public ErasureCodingPolicy getErasureCodingPolicy() { return ecPolicy; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java index 585ed99..bc73bfc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java @@ -274,6 +274,8 @@ public class Sender implements DataTransferProtocol { stripedBlockInfo.getDatanodes())) .addAllBlockTokens(PBHelperClient.convert( stripedBlockInfo.getBlockTokens())) + .addAllBlockIndices(PBHelperClient + .convertBlockIndices(stripedBlockInfo.getBlockIndices())) .setEcPolicy(PBHelperClient.convertErasureCodingPolicy( stripedBlockInfo.getErasureCodingPolicy())) .build(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java index a05567b..d5bb1e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -853,6 +853,22 @@ public class PBHelperClient { return results; } + public static List convertBlockIndices(byte[] blockIndices) { + List results = new ArrayList<>(blockIndices.length); + for (byte bt : blockIndices) { + results.add(Integer.valueOf(bt)); + } + return results; + } + + public static byte[] convertBlockIndices(List blockIndices) { + byte[] blkIndices = new byte[blockIndices.size()]; + for (int i = 0; i < blockIndices.size(); i++) { + blkIndices[i] = (byte) blockIndices.get(i).intValue(); + } + return blkIndices; + } + public static BlockStoragePolicy convert(BlockStoragePolicyProto proto) { List cList = proto.getCreationPolicy() .getStorageTypesList(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto index 522ee06..1407351 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto @@ -154,6 +154,7 @@ message OpBlockGroupChecksumProto { // each internal block has a block token repeated hadoop.common.TokenProto blockTokens = 3; required ErasureCodingPolicyProto ecPolicy = 4; + repeated uint32 blockIndices = 5; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java index b2f26f8..8b863f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java @@ -306,6 +306,7 @@ public abstract class Receiver implements DataTransferProtocol { PBHelperClient.convert(proto.getHeader().getBlock()), PBHelperClient.convert(proto.getDatanodes()), PBHelperClient.convertTokens(proto.getBlockTokensList()), + PBHelperClient.convertBlockIndices(proto.getBlockIndicesList()), PBHelperClient.convertErasureCodingPolicy(proto.getEcPolicy()) ); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java index 1f1a25c..ec6bbb6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs.server.datanode; import com.google.common.base.Preconditions; + +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; @@ -30,6 +32,8 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.erasurecode.StripedBlockChecksumReconstructor; +import org.apache.hadoop.hdfs.server.datanode.erasurecode.StripedReconstructionInfo; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.io.DataOutputBuffer; @@ -46,11 +50,14 @@ import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.security.MessageDigest; +import java.util.HashMap; +import java.util.Map; /** * Utilities for Block checksum computing, for both replicated and striped * blocks. */ +@InterfaceAudience.Private final class BlockChecksumHelper { static final Logger LOG = LoggerFactory.getLogger(BlockChecksumHelper.class); @@ -327,6 +334,7 @@ final class BlockChecksumHelper { private final ErasureCodingPolicy ecPolicy; private final DatanodeInfo[] datanodes; private final Token[] blockTokens; + private final byte[] blockIndices; private final DataOutputBuffer md5writer = new DataOutputBuffer(); @@ -338,17 +346,61 @@ final class BlockChecksumHelper { this.ecPolicy = stripedBlockInfo.getErasureCodingPolicy(); this.datanodes = stripedBlockInfo.getDatanodes(); this.blockTokens = stripedBlockInfo.getBlockTokens(); + this.blockIndices = stripedBlockInfo.getBlockIndices(); + } + + private static class LiveBlockInfo { + private final DatanodeInfo dn; + private final Token token; + + LiveBlockInfo(DatanodeInfo dn, Token token) { + this.dn = dn; + this.token = token; + } + + DatanodeInfo getDn() { + return dn; + } + + Token getToken() { + return token; + } } @Override void compute() throws IOException { - for (int idx = 0; idx < ecPolicy.getNumDataUnits(); idx++) { - ExtendedBlock block = - StripedBlockUtil.constructInternalBlock(blockGroup, - ecPolicy.getCellSize(), ecPolicy.getNumDataUnits(), idx); - DatanodeInfo targetDatanode = datanodes[idx]; - Token blockToken = blockTokens[idx]; - checksumBlock(block, idx, blockToken, targetDatanode); + assert datanodes.length == blockIndices.length; + + Map liveDns = new HashMap<>(datanodes.length); + int blkIndxLen = blockIndices.length; + int numDataUnits = ecPolicy.getNumDataUnits(); + // Prepare live datanode list. Missing data blocks will be reconstructed + // and recalculate checksum. + for (int idx = 0; idx < blkIndxLen; idx++) { + liveDns.put(blockIndices[idx], + new LiveBlockInfo(datanodes[idx], blockTokens[idx])); + } + for (int idx = 0; idx < numDataUnits && idx < blkIndxLen; idx++) { + try { + LiveBlockInfo liveBlkInfo = liveDns.get((byte) idx); + if (liveBlkInfo == null) { + // reconstruct block and calculate checksum for missing node + recalculateChecksum(idx); + } else { + try { + ExtendedBlock block = StripedBlockUtil.constructInternalBlock( + blockGroup, ecPolicy.getCellSize(), numDataUnits, idx); + checksumBlock(block, idx, liveBlkInfo.getToken(), + liveBlkInfo.getDn()); + } catch (IOException ioe) { + LOG.warn("Exception while reading checksum", ioe); + // reconstruct block and calculate checksum for the failed node + recalculateChecksum(idx); + } + } + } catch (IOException e) { + LOG.warn("Failed to get the checksum", e); + } } MD5Hash md5out = MD5Hash.digest(md5writer.getData()); @@ -379,52 +431,90 @@ final class BlockChecksumHelper { DataTransferProtos.OpBlockChecksumResponseProto checksumData = reply.getChecksumResponse(); - //read byte-per-checksum - final int bpc = checksumData.getBytesPerCrc(); - if (blockIdx == 0) { //first block - setBytesPerCRC(bpc); - } else if (bpc != getBytesPerCRC()) { - throw new IOException("Byte-per-checksum not matched: bpc=" + bpc - + " but bytesPerCRC=" + getBytesPerCRC()); - } - - //read crc-per-block - final long cpb = checksumData.getCrcPerBlock(); - if (blockIdx == 0) { - setCrcPerBlock(cpb); - } - - //read md5 - final MD5Hash md5 = new MD5Hash( - checksumData.getMd5().toByteArray()); - md5.write(md5writer); - // read crc-type final DataChecksum.Type ct; if (checksumData.hasCrcType()) { ct = PBHelperClient.convert(checksumData.getCrcType()); } else { - LOG.debug("Retrieving checksum from an earlier-version DataNode: " + - "inferring checksum by reading first byte"); + LOG.debug("Retrieving checksum from an earlier-version DataNode: " + + "inferring checksum by reading first byte"); ct = DataChecksum.Type.DEFAULT; } - if (blockIdx == 0) { // first block - setCrcType(ct); - } else if (getCrcType() != DataChecksum.Type.MIXED && - getCrcType() != ct) { - // if crc types are mixed in a file - setCrcType(DataChecksum.Type.MIXED); - } - + setOrVerifyChecksumProperties(blockIdx, checksumData.getBytesPerCrc(), + checksumData.getCrcPerBlock(), ct); + //read md5 + final MD5Hash md5 = new MD5Hash(checksumData.getMd5().toByteArray()); + md5.write(md5writer); if (LOG.isDebugEnabled()) { - if (blockIdx == 0) { - LOG.debug("set bytesPerCRC=" + getBytesPerCRC() - + ", crcPerBlock=" + getCrcPerBlock()); - } LOG.debug("got reply from " + targetDatanode + ": md5=" + md5); } } } + + /** + * Reconstruct this data block and recalculate checksum. + * + * @param errBlkIndex + * error index to be reconstrcuted and recalculate checksum. + * @throws IOException + */ + private void recalculateChecksum(int errBlkIndex) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Recalculate checksum for the missing/failed block index " + + errBlkIndex); + } + byte[] errIndices = new byte[1]; + errIndices[0] = (byte) errBlkIndex; + StripedReconstructionInfo stripedReconInfo = + new StripedReconstructionInfo( + blockGroup, ecPolicy, blockIndices, datanodes, errIndices); + final StripedBlockChecksumReconstructor checksumRecon = + new StripedBlockChecksumReconstructor( + getDatanode().getErasureCodingWorker(), stripedReconInfo, + md5writer); + checksumRecon.reconstruct(); + + DataChecksum checksum = checksumRecon.getChecksum(); + long crcPerBlock = checksum.getChecksumSize() <= 0 ? 0 + : checksumRecon.getChecksumDataLen() / checksum.getChecksumSize(); + setOrVerifyChecksumProperties(errBlkIndex, checksum.getBytesPerChecksum(), + crcPerBlock, checksum.getChecksumType()); + if (LOG.isDebugEnabled()) { + LOG.debug("Recalculated checksum for the block index " + errBlkIndex + + ": md5=" + checksumRecon.getMD5()); + } + } + + private void setOrVerifyChecksumProperties(int blockIdx, int bpc, + final long cpb, DataChecksum.Type ct) throws IOException { + //read byte-per-checksum + if (blockIdx == 0) { //first block + setBytesPerCRC(bpc); + } else if (bpc != getBytesPerCRC()) { + throw new IOException("Byte-per-checksum not matched: bpc=" + bpc + + " but bytesPerCRC=" + getBytesPerCRC()); + } + + //read crc-per-block + if (blockIdx == 0) { + setCrcPerBlock(cpb); + } + + if (blockIdx == 0) { // first block + setCrcType(ct); + } else if (getCrcType() != DataChecksum.Type.MIXED && + getCrcType() != ct) { + // if crc types are mixed in a file + setCrcType(DataChecksum.Type.MIXED); + } + + if (LOG.isDebugEnabled()) { + if (blockIdx == 0) { + LOG.debug("set bytesPerCRC=" + getBytesPerCRC() + + ", crcPerBlock=" + getCrcPerBlock()); + } + } + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java index e7c5abc..aacbb2d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java @@ -116,19 +116,24 @@ public final class ErasureCodingWorker { */ public void processErasureCodingTasks( Collection ecTasks) { - for (BlockECReconstructionInfo reconstructionInfo : ecTasks) { + for (BlockECReconstructionInfo reconInfo : ecTasks) { try { - final StripedReconstructor task = - new StripedReconstructor(this, reconstructionInfo); + StripedReconstructionInfo stripedReconInfo = + new StripedReconstructionInfo( + reconInfo.getExtendedBlock(), reconInfo.getErasureCodingPolicy(), + reconInfo.getLiveBlockIndices(), reconInfo.getSourceDnInfos(), + reconInfo.getTargetDnInfos(), reconInfo.getTargetStorageTypes()); + final StripedBlockReconstructor task = + new StripedBlockReconstructor(this, stripedReconInfo); if (task.hasValidTargets()) { stripedReconstructionPool.submit(task); } else { LOG.warn("No missing internal block. Skip reconstruction for task:{}", - reconstructionInfo); + reconInfo); } } catch (Throwable e) { LOG.warn("Failed to reconstruct striped block {}", - reconstructionInfo.getExtendedBlock().getLocalBlock(), e); + reconInfo.getExtendedBlock().getLocalBlock(), e); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java new file mode 100644 index 0000000..1b6758b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.erasurecode; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.MessageDigest; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.MD5Hash; + +/** + * StripedBlockChecksumReconstructor reconstruct one or more missed striped + * block in the striped block group, the minimum number of live striped blocks + * should be no less than data block number. Then checksum will be recalculated + * using the newly reconstructed block. + */ +@InterfaceAudience.Private +public class StripedBlockChecksumReconstructor extends StripedReconstructor { + + private ByteBuffer targetBuffer; + private final byte[] targetIndices; + + private byte[] checksumBuf; + private DataOutputBuffer checksumWriter; + private MD5Hash md5; + private long checksumDataLen; + + public StripedBlockChecksumReconstructor(ErasureCodingWorker worker, + StripedReconstructionInfo stripedReconInfo, + DataOutputBuffer checksumWriter) throws IOException { + super(worker, stripedReconInfo); + this.targetIndices = stripedReconInfo.getTargetIndices(); + assert targetIndices != null; + this.checksumWriter = checksumWriter; + init(); + } + + private void init() throws IOException { + getStripedReader().init(); + // allocate buffer to keep the reconstructed block data + targetBuffer = allocateBuffer(getBufferSize()); + long maxTargetLen = 0L; + for (int targetIndex : targetIndices) { + maxTargetLen = Math.max(maxTargetLen, getBlockLen(targetIndex)); + } + setMaxTargetLength(maxTargetLen); + int checksumSize = getChecksum().getChecksumSize(); + int bytesPerChecksum = getChecksum().getBytesPerChecksum(); + int tmpLen = checksumSize * (getBufferSize() / bytesPerChecksum); + checksumBuf = new byte[tmpLen]; + } + + public void reconstruct() throws IOException { + MessageDigest digester = MD5Hash.getDigester(); + while (getPositionInBlock() < getMaxTargetLength()) { + long remaining = getMaxTargetLength() - getPositionInBlock(); + final int toReconstructLen = (int) Math + .min(getStripedReader().getBufferSize(), remaining); + // step1: read from minimum source DNs required for reconstruction. + // The returned success list is the source DNs we do real read from + getStripedReader().readMinimumSources(toReconstructLen); + + // step2: decode to reconstruct targets + reconstructTargets(toReconstructLen); + + // step3: calculate checksum + getChecksum().calculateChunkedSums(targetBuffer.array(), 0, + targetBuffer.remaining(), checksumBuf, 0); + + // step4: updates the digest using the checksum array of bytes + digester.update(checksumBuf, 0, checksumBuf.length); + checksumDataLen += checksumBuf.length; + updatePositionInBlock(toReconstructLen); + clearBuffers(); + } + + byte[] digest = digester.digest(); + md5 = new MD5Hash(digest); + md5.write(checksumWriter); + } + + private void reconstructTargets(int toReconstructLen) { + initDecoderIfNecessary(); + + ByteBuffer[] inputs = getStripedReader().getInputBuffers(toReconstructLen); + + ByteBuffer[] outputs = new ByteBuffer[1]; + targetBuffer.limit(toReconstructLen); + outputs[0] = targetBuffer; + int[] tarIndices = new int[targetIndices.length]; + for (int i = 0; i < targetIndices.length; i++) { + tarIndices[i] = targetIndices[i]; + } + getDecoder().decode(inputs, tarIndices, outputs); + } + + /** + * Clear all associated buffers. + */ + private void clearBuffers() { + getStripedReader().clearBuffers(); + targetBuffer.clear(); + } + + public MD5Hash getMD5() { + return md5; + } + + public long getChecksumDataLen() { + return checksumDataLen; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java new file mode 100644 index 0000000..b800bef --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.erasurecode; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * StripedBlockReconstructor reconstruct one or more missed striped block in + * the striped block group, the minimum number of live striped blocks should + * be no less than data block number. + */ +@InterfaceAudience.Private +class StripedBlockReconstructor extends StripedReconstructor + implements Runnable { + + private StripedWriter stripedWriter; + + StripedBlockReconstructor(ErasureCodingWorker worker, + StripedReconstructionInfo stripedReconInfo) { + super(worker, stripedReconInfo); + + stripedWriter = new StripedWriter(this, getDatanode(), + getConf(), stripedReconInfo); + } + + boolean hasValidTargets() { + return stripedWriter.hasValidTargets(); + } + + @Override + public void run() { + getDatanode().incrementXmitsInProgress(); + try { + getStripedReader().init(); + + stripedWriter.init(); + + reconstruct(); + + stripedWriter.endTargetBlocks(); + + // Currently we don't check the acks for packets, this is similar as + // block replication. + } catch (Throwable e) { + LOG.warn("Failed to reconstruct striped block: {}", getBlockGroup(), e); + getDatanode().getMetrics().incrECFailedReconstructionTasks(); + } finally { + getDatanode().decrementXmitsInProgress(); + getDatanode().getMetrics().incrECReconstructionTasks(); + getStripedReader().close(); + stripedWriter.close(); + } + } + + void reconstruct() throws IOException { + while (getPositionInBlock() < getMaxTargetLength()) { + long remaining = getMaxTargetLength() - getPositionInBlock(); + final int toReconstructLen = + (int) Math.min(getStripedReader().getBufferSize(), remaining); + + // step1: read from minimum source DNs required for reconstruction. + // The returned success list is the source DNs we do real read from + getStripedReader().readMinimumSources(toReconstructLen); + + // step2: decode to reconstruct targets + reconstructTargets(toReconstructLen); + + // step3: transfer data + if (stripedWriter.transferData2Targets() == 0) { + String error = "Transfer failed for all targets."; + throw new IOException(error); + } + + updatePositionInBlock(toReconstructLen); + + clearBuffers(); + } + } + + private void reconstructTargets(int toReconstructLen) { + initDecoderIfNecessary(); + + ByteBuffer[] inputs = getStripedReader().getInputBuffers(toReconstructLen); + + int[] erasedIndices = stripedWriter.getRealTargetIndices(); + ByteBuffer[] outputs = stripedWriter.getRealTargetBuffers(toReconstructLen); + + getDecoder().decode(inputs, erasedIndices, outputs); + + stripedWriter.updateRealTargetBuffers(toReconstructLen); + } + + /** + * Clear all associated buffers. + */ + private void clearBuffers() { + getStripedReader().clearBuffers(); + + stripedWriter.clearBuffers(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java index fb7699a..e6d4ceb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java @@ -23,11 +23,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult; import org.apache.hadoop.util.DataChecksum; @@ -85,8 +82,7 @@ class StripedReader { private final CompletionService readService; StripedReader(StripedReconstructor reconstructor, DataNode datanode, - Configuration conf, - BlockECReconstructionInfo reconstructionInfo) { + Configuration conf, StripedReconstructionInfo stripedReconInfo) { stripedReadTimeoutInMills = conf.getInt( DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY, DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT); @@ -98,13 +94,11 @@ class StripedReader { this.datanode = datanode; this.conf = conf; - ErasureCodingPolicy ecPolicy = reconstructionInfo.getErasureCodingPolicy(); - dataBlkNum = ecPolicy.getNumDataUnits(); - parityBlkNum = ecPolicy.getNumParityUnits(); + dataBlkNum = stripedReconInfo.getEcPolicy().getNumDataUnits(); + parityBlkNum = stripedReconInfo.getEcPolicy().getNumParityUnits(); - ExtendedBlock blockGroup = reconstructionInfo.getExtendedBlock(); - int cellsNum = (int)((blockGroup.getNumBytes() - 1) / ecPolicy.getCellSize() - + 1); + int cellsNum = (int) ((stripedReconInfo.getBlockGroup().getNumBytes() - 1) + / stripedReconInfo.getEcPolicy().getCellSize() + 1); minRequiredSources = Math.min(cellsNum, dataBlkNum); if (minRequiredSources < dataBlkNum) { @@ -113,8 +107,10 @@ class StripedReader { zeroStripeIndices = new short[zeroStripNum]; } - liveIndices = reconstructionInfo.getLiveBlockIndices(); - sources = reconstructionInfo.getSourceDnInfos(); + this.liveIndices = stripedReconInfo.getLiveIndices(); + assert liveIndices != null; + this.sources = stripedReconInfo.getSources(); + assert sources != null; readers = new ArrayList<>(sources.length); readService = reconstructor.createReadService(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java new file mode 100644 index 0000000..a5c328b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.erasurecode; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; + +/** + * Stores striped block info that can be used for block reconstruction. + */ +@InterfaceAudience.Private +public class StripedReconstructionInfo { + + private final ExtendedBlock blockGroup; + private final ErasureCodingPolicy ecPolicy; + + // source info + private final byte[] liveIndices; + private final DatanodeInfo[] sources; + + // target info + private final byte[] targetIndices; + private final DatanodeInfo[] targets; + private final StorageType[] targetStorageTypes; + + public StripedReconstructionInfo(ExtendedBlock blockGroup, + ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources, + byte[] targetIndices) { + this(blockGroup, ecPolicy, liveIndices, sources, targetIndices, null, null); + } + + StripedReconstructionInfo(ExtendedBlock blockGroup, + ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources, + DatanodeInfo[] targets, StorageType[] targetStorageTypes) { + this(blockGroup, ecPolicy, liveIndices, sources, null, targets, + targetStorageTypes); + } + + private StripedReconstructionInfo(ExtendedBlock blockGroup, + ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources, + byte[] targetIndices, DatanodeInfo[] targets, + StorageType[] targetStorageTypes) { + + this.blockGroup = blockGroup; + this.ecPolicy = ecPolicy; + this.liveIndices = liveIndices; + this.sources = sources; + this.targetIndices = targetIndices; + this.targets = targets; + this.targetStorageTypes = targetStorageTypes; + } + + ExtendedBlock getBlockGroup() { + return blockGroup; + } + + ErasureCodingPolicy getEcPolicy() { + return ecPolicy; + } + + byte[] getLiveIndices() { + return liveIndices; + } + + DatanodeInfo[] getSources() { + return sources; + } + + byte[] getTargetIndices() { + return targetIndices; + } + + DatanodeInfo[] getTargets() { + return targets; + } + + StorageType[] getTargetStorageTypes() { + return targetStorageTypes; + } +} + http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java index 47a6979..782d091 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java @@ -24,7 +24,6 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; @@ -39,6 +38,7 @@ import java.nio.ByteBuffer; import java.util.BitSet; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ThreadPoolExecutor; /** * StripedReconstructor reconstruct one or more missed striped block in the @@ -94,58 +94,50 @@ import java.util.concurrent.ExecutorCompletionService; * reconstructed result received by targets? */ @InterfaceAudience.Private -class StripedReconstructor implements Runnable { - private static final Logger LOG = DataNode.LOG; +abstract class StripedReconstructor { + protected static final Logger LOG = DataNode.LOG; - private final ErasureCodingWorker worker; - private final DataNode datanode; private final Configuration conf; - + private final DataNode datanode; private final ErasureCodingPolicy ecPolicy; - private RawErasureDecoder decoder; - private final ExtendedBlock blockGroup; - private final BitSet liveBitSet; // position in striped internal block private long positionInBlock; - private StripedReader stripedReader; - - private StripedWriter stripedWriter; - + private ThreadPoolExecutor stripedReadPool; private final CachingStrategy cachingStrategy; + private long maxTargetLength = 0L; + private final BitSet liveBitSet; StripedReconstructor(ErasureCodingWorker worker, - BlockECReconstructionInfo reconstructionInfo) { - this.worker = worker; + StripedReconstructionInfo stripedReconInfo) { + this.stripedReadPool = worker.getStripedReadPool(); this.datanode = worker.getDatanode(); this.conf = worker.getConf(); - - ecPolicy = reconstructionInfo.getErasureCodingPolicy(); - - blockGroup = reconstructionInfo.getExtendedBlock(); - byte[] liveIndices = reconstructionInfo.getLiveBlockIndices(); - liveBitSet = new BitSet(ecPolicy.getNumDataUnits() + - ecPolicy.getNumParityUnits()); - for (int i = 0; i < liveIndices.length; i++) { - liveBitSet.set(liveIndices[i]); + this.ecPolicy = stripedReconInfo.getEcPolicy(); + liveBitSet = new BitSet( + ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits()); + for (int i = 0; i < stripedReconInfo.getLiveIndices().length; i++) { + liveBitSet.set(stripedReconInfo.getLiveIndices()[i]); } - - stripedReader = new StripedReader(this, datanode, - conf, reconstructionInfo); - stripedWriter = new StripedWriter(this, datanode, - conf, reconstructionInfo); + blockGroup = stripedReconInfo.getBlockGroup(); + stripedReader = new StripedReader(this, datanode, conf, stripedReconInfo); cachingStrategy = CachingStrategy.newDefaultStrategy(); positionInBlock = 0L; } - BitSet getLiveBitSet() { - return liveBitSet; - } + /** + * Reconstruct one or more missed striped block in the striped block group, + * the minimum number of live striped blocks should be no less than data + * block number. + * + * @throws IOException + */ + abstract void reconstruct() throws IOException; ByteBuffer allocateBuffer(int length) { return ByteBuffer.allocate(length); @@ -160,61 +152,8 @@ class StripedReconstructor implements Runnable { ecPolicy, i); } - boolean hasValidTargets() { - return stripedWriter.hasValidTargets(); - } - - @Override - public void run() { - datanode.incrementXmitsInProgress(); - try { - stripedReader.init(); - - stripedWriter.init(); - - reconstructAndTransfer(); - - stripedWriter.endTargetBlocks(); - - // Currently we don't check the acks for packets, this is similar as - // block replication. - } catch (Throwable e) { - LOG.warn("Failed to reconstruct striped block: {}", blockGroup, e); - datanode.getMetrics().incrECFailedReconstructionTasks(); - } finally { - datanode.decrementXmitsInProgress(); - datanode.getMetrics().incrECReconstructionTasks(); - stripedReader.close(); - stripedWriter.close(); - } - } - - void reconstructAndTransfer() throws IOException { - while (positionInBlock < stripedWriter.getMaxTargetLength()) { - long remaining = stripedWriter.getMaxTargetLength() - positionInBlock; - final int toReconstructLen = - (int) Math.min(stripedReader.getBufferSize(), remaining); - // step1: read from minimum source DNs required for reconstruction. - // The returned success list is the source DNs we do real read from - stripedReader.readMinimumSources(toReconstructLen); - - // step2: decode to reconstruct targets - reconstructTargets(toReconstructLen); - - // step3: transfer data - if (stripedWriter.transferData2Targets() == 0) { - String error = "Transfer failed for all targets."; - throw new IOException(error); - } - - positionInBlock += toReconstructLen; - - clearBuffers(); - } - } - // Initialize decoder - private void initDecoderIfNecessary() { + protected void initDecoderIfNecessary() { if (decoder == null) { ErasureCoderOptions coderOptions = new ErasureCoderOptions( ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits()); @@ -223,32 +162,10 @@ class StripedReconstructor implements Runnable { } } - private void reconstructTargets(int toReconstructLen) { - initDecoderIfNecessary(); - - ByteBuffer[] inputs = stripedReader.getInputBuffers(toReconstructLen); - - int[] erasedIndices = stripedWriter.getRealTargetIndices(); - ByteBuffer[] outputs = stripedWriter.getRealTargetBuffers(toReconstructLen); - - decoder.decode(inputs, erasedIndices, outputs); - - stripedWriter.updateRealTargetBuffers(toReconstructLen); - } - long getPositionInBlock() { return positionInBlock; } - /** - * Clear all associated buffers. - */ - private void clearBuffers() { - stripedReader.clearBuffers(); - - stripedWriter.clearBuffers(); - } - InetSocketAddress getSocketAddress4Transfer(DatanodeInfo dnInfo) { return NetUtils.createSocketAddr(dnInfo.getXferAddr( datanode.getDnConf().getConnectToDnViaHostname())); @@ -258,7 +175,7 @@ class StripedReconstructor implements Runnable { return stripedReader.getBufferSize(); } - DataChecksum getChecksum() { + public DataChecksum getChecksum() { return stripedReader.getChecksum(); } @@ -267,10 +184,42 @@ class StripedReconstructor implements Runnable { } CompletionService createReadService() { - return new ExecutorCompletionService<>(worker.getStripedReadPool()); + return new ExecutorCompletionService<>(stripedReadPool); } ExtendedBlock getBlockGroup() { return blockGroup; } + + BitSet getLiveBitSet() { + return liveBitSet; + } + + long getMaxTargetLength() { + return maxTargetLength; + } + + void setMaxTargetLength(long maxTargetLength) { + this.maxTargetLength = maxTargetLength; + } + + void updatePositionInBlock(long positionInBlockArg) { + this.positionInBlock += positionInBlockArg; + } + + RawErasureDecoder getDecoder() { + return decoder; + } + + StripedReader getStripedReader() { + return stripedReader; + } + + Configuration getConf() { + return conf; + } + + DataNode getDatanode() { + return datanode; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java index e2052a3..ca7a3a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java @@ -22,11 +22,9 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.util.DataChecksum; import org.slf4j.Logger; @@ -57,7 +55,6 @@ class StripedWriter { private final short[] targetIndices; private boolean hasValidTargets; private final StorageType[] targetStorageTypes; - private long maxTargetLength; private StripedBlockWriter[] writers; @@ -67,20 +64,19 @@ class StripedWriter { private int bytesPerChecksum; private int checksumSize; - StripedWriter(StripedReconstructor reconstructor, - DataNode datanode, - Configuration conf, - BlockECReconstructionInfo reconstructionInfo) { + StripedWriter(StripedReconstructor reconstructor, DataNode datanode, + Configuration conf, StripedReconstructionInfo stripedReconInfo) { this.reconstructor = reconstructor; this.datanode = datanode; this.conf = conf; - ErasureCodingPolicy ecPolicy = reconstructionInfo.getErasureCodingPolicy(); - dataBlkNum = ecPolicy.getNumDataUnits(); - parityBlkNum = ecPolicy.getNumParityUnits(); + dataBlkNum = stripedReconInfo.getEcPolicy().getNumDataUnits(); + parityBlkNum = stripedReconInfo.getEcPolicy().getNumParityUnits(); - targets = reconstructionInfo.getTargetDnInfos(); - targetStorageTypes = reconstructionInfo.getTargetStorageTypes(); + this.targets = stripedReconInfo.getTargets(); + assert targets != null; + this.targetStorageTypes = stripedReconInfo.getTargetStorageTypes(); + assert targetStorageTypes != null; writers = new StripedBlockWriter[targets.length]; @@ -88,12 +84,12 @@ class StripedWriter { Preconditions.checkArgument(targetIndices.length <= parityBlkNum, "Too much missed striped blocks."); initTargetIndices(); - - maxTargetLength = 0L; + long maxTargetLength = 0L; for (short targetIndex : targetIndices) { maxTargetLength = Math.max(maxTargetLength, reconstructor.getBlockLen(targetIndex)); } + reconstructor.setMaxTargetLength(maxTargetLength); // targetsStatus store whether some target is success, it will record // any failed target once, if some target failed (invalid DN or transfer @@ -126,7 +122,6 @@ class StripedWriter { BitSet bitset = reconstructor.getLiveBitSet(); int m = 0; - int k = 0; hasValidTargets = false; for (int i = 0; i < dataBlkNum + parityBlkNum; i++) { if (!bitset.get(i)) { @@ -257,10 +252,6 @@ class StripedWriter { } } - long getMaxTargetLength() { - return maxTargetLength; - } - byte[] getChecksumBuf() { return checksumBuf; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java index 598e76f..d223354 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java @@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSClient; @@ -276,6 +277,52 @@ public class TestDecommissionWithStriped { cleanupFile(dfs, ecFile); } + /** + * Tests to verify that the file checksum should be able to compute after the + * decommission operation. + * + * Below is the block indices list after the decommission. ' represents + * decommissioned node index. + * + * 0, 2, 3, 4, 5, 6, 7, 8, 1, 1' + * + * Here, this list contains duplicated blocks and does not maintaining any + * order. + */ + @Test(timeout = 120000) + public void testFileChecksumAfterDecommission() throws Exception { + LOG.info("Starting test testFileChecksumAfterDecommission"); + + final Path ecFile = new Path(ecDir, "testFileChecksumAfterDecommission"); + int writeBytes = BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS; + writeStripedFile(dfs, ecFile, writeBytes); + Assert.assertEquals(0, bm.numOfUnderReplicatedBlocks()); + FileChecksum fileChecksum1 = dfs.getFileChecksum(ecFile, writeBytes); + + final List decommisionNodes = new ArrayList(); + LocatedBlock lb = dfs.getClient().getLocatedBlocks(ecFile.toString(), 0) + .get(0); + DatanodeInfo[] dnLocs = lb.getLocations(); + assertEquals(NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS, dnLocs.length); + int decommNodeIndex = 1; + + // add the node which will be decommissioning + decommisionNodes.add(dnLocs[decommNodeIndex]); + decommissionNode(0, decommisionNodes, AdminStates.DECOMMISSIONED); + assertEquals(decommisionNodes.size(), fsn.getNumDecomLiveDataNodes()); + assertNull(checkFile(dfs, ecFile, 9, decommisionNodes, numDNs)); + StripedFileTestUtil.checkData(dfs, ecFile, writeBytes, decommisionNodes, + null); + + // verify checksum + FileChecksum fileChecksum2 = dfs.getFileChecksum(ecFile, writeBytes); + LOG.info("fileChecksum1:" + fileChecksum1); + LOG.info("fileChecksum2:" + fileChecksum2); + + Assert.assertTrue("Checksum mismatches!", + fileChecksum1.equals(fileChecksum2)); + } + private void testDecommission(int writeBytes, int storageCount, int decomNodeCount, String filename) throws IOException, Exception { Path ecFile = new Path(ecDir, filename); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d749cf65/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java index 7cee344..3bee6be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java @@ -163,17 +163,40 @@ public class TestFileChecksum { Assert.assertFalse(stripedFileChecksum1.equals(replicatedFileChecksum)); } - /* - // TODO: allow datanode failure, HDFS-9833 @Test - public void testStripedAndReplicatedWithFailure() throws Exception { - FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1, - 10, true); - FileChecksum replicatedFileChecksum = getFileChecksum(replicatedFile, - 10, true); + public void testStripedFileChecksumWithMissedDataBlocks1() throws Exception { + FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1, fileSize, + false); + FileChecksum stripedFileChecksumRecon = getFileChecksum(stripedFile1, + fileSize, true); - Assert.assertFalse(stripedFileChecksum1.equals(replicatedFileChecksum)); - }*/ + LOG.info("stripedFileChecksum1:" + stripedFileChecksum1); + LOG.info("stripedFileChecksumRecon:" + stripedFileChecksumRecon); + + Assert.assertTrue("Checksum mismatches!", + stripedFileChecksum1.equals(stripedFileChecksumRecon)); + } + + @Test + public void testStripedFileChecksumWithMissedDataBlocks2() throws Exception { + FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1, -1, + false); + FileChecksum stripedFileChecksum2 = getFileChecksum(stripedFile2, -1, + false); + FileChecksum stripedFileChecksum2Recon = getFileChecksum(stripedFile2, -1, + true); + + LOG.info("stripedFileChecksum1:" + stripedFileChecksum1); + LOG.info("stripedFileChecksum2:" + stripedFileChecksum1); + LOG.info("stripedFileChecksum2Recon:" + stripedFileChecksum2Recon); + + Assert.assertTrue("Checksum mismatches!", + stripedFileChecksum1.equals(stripedFileChecksum2)); + Assert.assertTrue("Checksum mismatches!", + stripedFileChecksum1.equals(stripedFileChecksum2Recon)); + Assert.assertTrue("Checksum mismatches!", + stripedFileChecksum2.equals(stripedFileChecksum2Recon)); + } private FileChecksum getFileChecksum(String filePath, int range, boolean killDn) throws Exception { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org