Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 64C1F1801D for ; Fri, 14 Aug 2015 17:55:24 +0000 (UTC) Received: (qmail 66341 invoked by uid 500); 14 Aug 2015 17:55:15 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 66146 invoked by uid 500); 14 Aug 2015 17:55:14 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 65120 invoked by uid 99); 14 Aug 2015 17:55:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Aug 2015 17:55:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1B084E35E6; Fri, 14 Aug 2015 17:55:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhz@apache.org To: common-commits@hadoop.apache.org Date: Fri, 14 Aug 2015 17:55:34 -0000 Message-Id: <5d8dbfef2b714f02b7815761d55ce301@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [22/36] hadoop git commit: HDFS-7285. Erasure Coding Support inside HDFS. http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java index 3f242e0..bfb8dcc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.namenode; import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED; +import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION; import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID; import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.NO_SNAPSHOT_ID; @@ -34,13 +35,16 @@ import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiff; import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiffList; import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshotFeature; @@ -79,12 +83,14 @@ public class INodeFile extends INodeWithAdditionalFields /** * Bit format: - * [4-bit storagePolicyID][12-bit replication][48-bit preferredBlockSize] + * [4-bit storagePolicyID][1-bit isStriped] + * [11-bit replication][48-bit preferredBlockSize] */ static enum HeaderFormat { PREFERRED_BLOCK_SIZE(null, 48, 1), - REPLICATION(PREFERRED_BLOCK_SIZE.BITS, 12, 1), - STORAGE_POLICY_ID(REPLICATION.BITS, BlockStoragePolicySuite.ID_BIT_LENGTH, + REPLICATION(PREFERRED_BLOCK_SIZE.BITS, 11, 0), + IS_STRIPED(REPLICATION.BITS, 1, 0), + STORAGE_POLICY_ID(IS_STRIPED.BITS, BlockStoragePolicySuite.ID_BIT_LENGTH, 0); private final LongBitFormat BITS; @@ -105,14 +111,27 @@ public class INodeFile extends INodeWithAdditionalFields return (byte)STORAGE_POLICY_ID.BITS.retrieve(header); } + static boolean isStriped(long header) { + long isStriped = IS_STRIPED.BITS.retrieve(header); + Preconditions.checkState(isStriped == 0 || isStriped == 1); + return isStriped == 1; + } + static long toLong(long preferredBlockSize, short replication, - byte storagePolicyID) { + boolean isStriped, byte storagePolicyID) { long h = 0; if (preferredBlockSize == 0) { preferredBlockSize = PREFERRED_BLOCK_SIZE.BITS.getMin(); } h = PREFERRED_BLOCK_SIZE.BITS.combine(preferredBlockSize, h); - h = REPLICATION.BITS.combine(replication, h); + // Replication factor for striped files is zero + if (isStriped) { + h = REPLICATION.BITS.combine(0L, h); + h = IS_STRIPED.BITS.combine(1L, h); + } else { + h = REPLICATION.BITS.combine(replication, h); + h = IS_STRIPED.BITS.combine(0L, h); + } h = STORAGE_POLICY_ID.BITS.combine(storagePolicyID, h); return h; } @@ -127,15 +146,21 @@ public class INodeFile extends INodeWithAdditionalFields long atime, BlockInfo[] blklist, short replication, long preferredBlockSize) { this(id, name, permissions, mtime, atime, blklist, replication, - preferredBlockSize, (byte) 0); + preferredBlockSize, (byte) 0, false); } INodeFile(long id, byte[] name, PermissionStatus permissions, long mtime, long atime, BlockInfo[] blklist, short replication, - long preferredBlockSize, byte storagePolicyID) { + long preferredBlockSize, byte storagePolicyID, boolean isStriped) { super(id, name, permissions, mtime, atime); - header = HeaderFormat.toLong(preferredBlockSize, replication, storagePolicyID); - this.blocks = blklist; + header = HeaderFormat.toLong(preferredBlockSize, replication, isStriped, + storagePolicyID); + if (blklist != null && blklist.length > 0) { + for (BlockInfo b : blklist) { + Preconditions.checkArgument(b.isStriped() == isStriped); + } + } + setBlocks(blklist); } public INodeFile(INodeFile that) { @@ -227,31 +252,37 @@ public class INodeFile extends INodeWithAdditionalFields @Override // BlockCollection public void setBlock(int index, BlockInfo blk) { + Preconditions.checkArgument(blk.isStriped() == this.isStriped()); this.blocks[index] = blk; } @Override // BlockCollection, the file should be under construction - public BlockInfoContiguousUnderConstruction setLastBlock( - BlockInfo lastBlock, DatanodeStorageInfo[] locations) - throws IOException { + public void convertLastBlockToUC(BlockInfo lastBlock, + DatanodeStorageInfo[] locations) throws IOException { Preconditions.checkState(isUnderConstruction(), "file is no longer under construction"); - if (numBlocks() == 0) { throw new IOException("Failed to set last block: File is empty."); } - BlockInfoContiguousUnderConstruction ucBlock = - lastBlock.convertToBlockUnderConstruction( - BlockUCState.UNDER_CONSTRUCTION, locations); + + final BlockInfo ucBlock; + if (isStriped()) { + Preconditions.checkState(lastBlock.isStriped()); + ucBlock = ((BlockInfoStriped) lastBlock) + .convertToBlockUnderConstruction(UNDER_CONSTRUCTION, locations); + } else { + Preconditions.checkState(!lastBlock.isStriped()); + ucBlock = ((BlockInfoContiguous) lastBlock) + .convertToBlockUnderConstruction(UNDER_CONSTRUCTION, locations); + } setBlock(numBlocks() - 1, ucBlock); - return ucBlock; } /** * Remove a block from the block list. This block should be * the last one on the list. */ - BlockInfoContiguousUnderConstruction removeLastBlock(Block oldblock) { + BlockInfoUnderConstruction removeLastBlock(Block oldblock) { Preconditions.checkState(isUnderConstruction(), "file is no longer under construction"); if (blocks == null || blocks.length == 0) { @@ -262,8 +293,8 @@ public class INodeFile extends INodeWithAdditionalFields return null; } - BlockInfoContiguousUnderConstruction uc = - (BlockInfoContiguousUnderConstruction)blocks[size_1]; + BlockInfoUnderConstruction uc = + (BlockInfoUnderConstruction)blocks[size_1]; //copy to a new list BlockInfo[] newlist = new BlockInfo[size_1]; System.arraycopy(blocks, 0, newlist, 0, size_1); @@ -350,6 +381,7 @@ public class INodeFile extends INodeWithAdditionalFields /** The same as getFileReplication(null). */ @Override // INodeFileAttributes + // TODO properly handle striped files public final short getFileReplication() { return getFileReplication(CURRENT_STATE_ID); } @@ -365,7 +397,8 @@ public class INodeFile extends INodeWithAdditionalFields } max = maxInSnapshot > max ? maxInSnapshot : max; } - return max; + return isStriped() ? + HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS : max; } /** Set the replication factor of this file. */ @@ -413,6 +446,15 @@ public class INodeFile extends INodeWithAdditionalFields setStoragePolicyID(storagePolicyId); } + /** + * @return true if the file is in the striping layout. + */ + @VisibleForTesting + @Override + public boolean isStriped() { + return HeaderFormat.isStriped(header); + } + @Override // INodeFileAttributes public long getHeaderLong() { return header; @@ -429,7 +471,9 @@ public class INodeFile extends INodeWithAdditionalFields if(snapshot == CURRENT_STATE_ID || getDiffs() == null) { return getBlocks(); } + // find blocks stored in snapshot diffs (for truncate) FileDiff diff = getDiffs().getDiffById(snapshot); + // note that currently FileDiff can only store contiguous blocks BlockInfo[] snapshotBlocks = diff == null ? getBlocks() : diff.getBlocks(); if (snapshotBlocks != null) { return snapshotBlocks; @@ -456,6 +500,7 @@ public class INodeFile extends INodeWithAdditionalFields int size = this.blocks.length; int totalAddedBlocks = 0; for(INodeFile f : inodes) { + Preconditions.checkState(f.isStriped() == this.isStriped()); totalAddedBlocks += f.blocks.length; } @@ -476,6 +521,7 @@ public class INodeFile extends INodeWithAdditionalFields * add a block to the block list */ void addBlock(BlockInfo newblock) { + Preconditions.checkArgument(newblock.isStriped() == this.isStriped()); if (this.blocks == null) { this.setBlocks(new BlockInfo[]{newblock}); } else { @@ -589,6 +635,10 @@ public class INodeFile extends INodeWithAdditionalFields final long ssDeltaNoReplication; short replication; + if (isStriped()) { + return computeQuotaUsageWithStriped(bsp, counts); + } + if (last < lastSnapshotId) { ssDeltaNoReplication = computeFileSize(true, false); replication = getFileReplication(); @@ -611,6 +661,18 @@ public class INodeFile extends INodeWithAdditionalFields return counts; } + /** + * Compute quota of striped file. Note that currently EC files do not support + * append/hflush/hsync, thus the file length recorded in snapshots should be + * the same with the current file length. + */ + public final QuotaCounts computeQuotaUsageWithStriped( + BlockStoragePolicy bsp, QuotaCounts counts) { + counts.addNameSpace(1); + counts.add(storagespaceConsumed(bsp)); + return counts; + } + @Override public final ContentSummaryComputationContext computeContentSummary( final ContentSummaryComputationContext summary) { @@ -695,16 +757,20 @@ public class INodeFile extends INodeWithAdditionalFields } final int last = blocks.length - 1; //check if the last block is BlockInfoUnderConstruction - long size = blocks[last].getNumBytes(); - if (blocks[last] instanceof BlockInfoContiguousUnderConstruction) { + BlockInfo lastBlk = blocks[last]; + long size = lastBlk.getNumBytes(); + if (lastBlk instanceof BlockInfoUnderConstruction) { if (!includesLastUcBlock) { size = 0; } else if (usePreferredBlockSize4LastUcBlock) { - size = getPreferredBlockSize(); + size = isStriped()? + getPreferredBlockSize() * + ((BlockInfoStriped)lastBlk).getDataBlockNum() : + getPreferredBlockSize(); } } //sum other blocks - for(int i = 0; i < last; i++) { + for (int i = 0; i < last; i++) { size += blocks[i].getNumBytes(); } return size; @@ -716,6 +782,32 @@ public class INodeFile extends INodeWithAdditionalFields * Use preferred block size for the last block if it is under construction. */ public final QuotaCounts storagespaceConsumed(BlockStoragePolicy bsp) { + if (isStriped()) { + return storagespaceConsumedStriped(); + } else { + return storagespaceConsumedContiguous(bsp); + } + } + + // TODO: support EC with heterogeneous storage + public final QuotaCounts storagespaceConsumedStriped() { + QuotaCounts counts = new QuotaCounts.Builder().build(); + if (blocks == null || blocks.length == 0) { + return counts; + } + + for (BlockInfo b : blocks) { + Preconditions.checkState(b.isStriped()); + long blockSize = b.isComplete() ? + ((BlockInfoStriped)b).spaceConsumed() : getPreferredBlockSize() * + ((BlockInfoStriped)b).getTotalBlockNum(); + counts.addStorageSpace(blockSize); + } + return counts; + } + + public final QuotaCounts storagespaceConsumedContiguous( + BlockStoragePolicy bsp) { QuotaCounts counts = new QuotaCounts.Builder().build(); final Iterable blocks; FileWithSnapshotFeature sf = getFileWithSnapshotFeature(); @@ -819,6 +911,7 @@ public class INodeFile extends INodeWithAdditionalFields /** * compute the quota usage change for a truncate op * @param newLength the length for truncation + * TODO: properly handle striped blocks (HDFS-7622) **/ void computeQuotaDeltaForTruncate( long newLength, BlockStoragePolicy bsps, @@ -883,8 +976,15 @@ public class INodeFile extends INodeWithAdditionalFields setBlocks(newBlocks); } + /** + * This function is only called when block list is stored in snapshot + * diffs. Note that this can only happen when truncation happens with + * snapshots. Since we do not support truncation with striped blocks, + * we only need to handle contiguous blocks here. + */ public void collectBlocksBeyondSnapshot(BlockInfo[] snapshotBlocks, BlocksMapUpdateInfo collectedBlocks) { + Preconditions.checkState(!isStriped()); BlockInfo[] oldBlocks = getBlocks(); if(snapshotBlocks == null || oldBlocks == null) return; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java index 204c8ac..13bd9e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java @@ -29,6 +29,9 @@ public interface INodeFileAttributes extends INodeAttributes { /** @return the file replication. */ public short getFileReplication(); + /** @return whether the file is striped (instead of contiguous) */ + public boolean isStriped(); + /** @return preferred block size in bytes */ public long getPreferredBlockSize(); @@ -47,10 +50,10 @@ public interface INodeFileAttributes extends INodeAttributes { public SnapshotCopy(byte[] name, PermissionStatus permissions, AclFeature aclFeature, long modificationTime, long accessTime, short replication, long preferredBlockSize, - byte storagePolicyID, XAttrFeature xAttrsFeature) { + byte storagePolicyID, XAttrFeature xAttrsFeature, boolean isStriped) { super(name, permissions, aclFeature, modificationTime, accessTime, xAttrsFeature); - header = HeaderFormat.toLong(preferredBlockSize, replication, + header = HeaderFormat.toLong(preferredBlockSize, replication, isStriped, storagePolicyID); } @@ -70,6 +73,11 @@ public interface INodeFileAttributes extends INodeAttributes { } @Override + public boolean isStriped() { + return HeaderFormat.isStriped(header); + } + + @Override public long getPreferredBlockSize() { return HeaderFormat.getPreferredBlockSize(header); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java index 1a1edaf..244dafe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java @@ -116,8 +116,8 @@ public class LeaseManager { for(BlockInfo b : blocks) { if(!b.isComplete()) numUCBlocks++; + } } - } LOG.info("Number of blocks under construction: " + numUCBlocks); return numUCBlocks; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java index 3a5dc12..2943fc2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java @@ -87,7 +87,8 @@ public class NameNodeLayoutVersion { BLOCK_STORAGE_POLICY(-60, -60, "Block Storage policy"), TRUNCATE(-61, -61, "Truncate"), APPEND_NEW_BLOCK(-62, -61, "Support appending to new block"), - QUOTA_BY_STORAGE_TYPE(-63, -61, "Support quota for specific storage types"); + QUOTA_BY_STORAGE_TYPE(-63, -61, "Support quota for specific storage types"), + ERASURE_CODING(-64, -61, "Support erasure coding"); private final FeatureInfo info; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 6b7e8cf..bee06a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -84,6 +84,7 @@ import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.ErasureCodingZone; import org.apache.hadoop.hdfs.protocol.EncryptionZone; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.FSLimitException; @@ -143,6 +144,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RetriableException; @@ -794,7 +796,7 @@ class NameNodeRpcServer implements NamenodeProtocols { public LocatedBlock updateBlockForPipeline(ExtendedBlock block, String clientName) throws IOException { checkNNStartup(); - return namesystem.updateBlockForPipeline(block, clientName); + return namesystem.bumpBlockGenerationStamp(block, clientName); } @@ -1847,6 +1849,24 @@ class NameNodeRpcServer implements NamenodeProtocols { } @Override // ClientProtocol + public void createErasureCodingZone(String src, ECSchema schema, int cellSize) + throws IOException { + checkNNStartup(); + final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return; + } + boolean success = false; + try { + namesystem.createErasureCodingZone(src, schema, cellSize, + cacheEntry != null); + success = true; + } finally { + RetryCache.setState(cacheEntry, success); + } + } + + @Override // ClientProtocol public void setXAttr(String src, XAttr xAttr, EnumSet flag) throws IOException { checkNNStartup(); @@ -2041,4 +2061,16 @@ class NameNodeRpcServer implements NamenodeProtocols { namesystem.checkSuperuserPrivilege(); nn.spanReceiverHost.removeSpanReceiver(id); } + + @Override // ClientProtocol + public ECSchema[] getECSchemas() throws IOException { + checkNNStartup(); + return namesystem.getErasureCodingSchemas(); + } + + @Override // ClientProtocol + public ErasureCodingZone getErasureCodingZone(String src) throws IOException { + checkNNStartup(); + return namesystem.getErasureCodingZone(src); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java index 7d4cd7e..21062e2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java @@ -65,8 +65,9 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicies; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; @@ -123,6 +124,9 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { private final int totalDatanodes; private final InetAddress remoteAddress; + private long totalDirs = 0L; + private long totalSymlinks = 0L; + private String lostFound = null; private boolean lfInited = false; private boolean lfInitedOk = false; @@ -170,7 +174,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { private final PrintWriter out; private List snapshottableDirs = null; - private final BlockPlacementPolicy bpPolicy; + private final BlockPlacementPolicies bpPolicies; private StoragePolicySummary storageTypeSummary = null; /** @@ -192,7 +196,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { this.out = out; this.totalDatanodes = totalDatanodes; this.remoteAddress = remoteAddress; - this.bpPolicy = BlockPlacementPolicy.getInstance(conf, null, + this.bpPolicies = new BlockPlacementPolicies(conf, null, networktopology, namenode.getNamesystem().getBlockManager().getDatanodeManager() .getHost2DatanodeMap()); @@ -255,7 +259,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { out.println("Block Id: " + blockId); out.println("Block belongs to: "+iNode.getFullPathName()); out.println("No. of Expected Replica: " + - bc.getPreferredBlockReplication()); + bm.getExpectedReplicaNum(bc, blockInfo)); out.println("No. of live Replica: " + numberReplicas.liveReplicas()); out.println("No. of excess Replica: " + numberReplicas.excessReplicas()); out.println("No. of stale Replica: " + @@ -356,13 +360,21 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { namenode.getNamesystem().getBlockManager().getStoragePolicies()); } - Result res = new Result(conf); + Result replRes = new ReplicationResult(conf); + Result ecRes = new ErasureCodingResult(conf); - check(path, file, res); + check(path, file, replRes, ecRes); - out.println(res); - out.println(" Number of data-nodes:\t\t" + totalDatanodes); + out.print("\nStatus: "); + out.println(replRes.isHealthy() && ecRes.isHealthy() ? "HEALTHY" : "CORRUPT"); + out.println(" Number of data-nodes:\t" + totalDatanodes); out.println(" Number of racks:\t\t" + networktopology.getNumOfRacks()); + out.println(" Total dirs:\t\t\t" + totalDirs); + out.println(" Total symlinks:\t\t" + totalSymlinks); + out.println("\nReplicated Blocks:"); + out.println(replRes); + out.println("\nErasure Coded Block Groups:"); + out.println(ecRes); if (this.showStoragePolcies) { out.print(storageTypeSummary.toString()); @@ -382,7 +394,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { // of file system and return appropriate code. Changing the output // string might break testcases. Also note this must be the last line // of the report. - if (res.isHealthy()) { + if (replRes.isHealthy() && ecRes.isHealthy()) { out.print("\n\nThe filesystem under path '" + path + "' " + HEALTHY_STATUS); } else { out.print("\n\nThe filesystem under path '" + path + "' " + CORRUPT_STATUS); @@ -425,42 +437,49 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { } @VisibleForTesting - void check(String parent, HdfsFileStatus file, Result res) throws IOException { + void check(String parent, HdfsFileStatus file, Result replRes, Result ecRes) + throws IOException { String path = file.getFullName(parent); if (file.isDir()) { - checkDir(path, res); + checkDir(path, replRes, ecRes); return; } if (file.isSymlink()) { if (showFiles) { out.println(path + " "); } - res.totalSymlinks++; + totalSymlinks++; return; } LocatedBlocks blocks = getBlockLocations(path, file); if (blocks == null) { // the file is deleted return; } - collectFileSummary(path, file, res, blocks); - collectBlocksSummary(parent, file, res, blocks); + + final Result r = file.getReplication() == 0? ecRes: replRes; + collectFileSummary(path, file, r, blocks); + if (showprogress && (replRes.totalFiles + ecRes.totalFiles) % 100 == 0) { + out.println(); + out.flush(); + } + collectBlocksSummary(parent, file, r, blocks); } - private void checkDir(String path, Result res) throws IOException { + private void checkDir(String path, Result replRes, Result ecRes) throws IOException { if (snapshottableDirs != null && snapshottableDirs.contains(path)) { String snapshotPath = (path.endsWith(Path.SEPARATOR) ? path : path + Path.SEPARATOR) + HdfsConstants.DOT_SNAPSHOT_DIR; HdfsFileStatus snapshotFileInfo = namenode.getRpcServer().getFileInfo( snapshotPath); - check(snapshotPath, snapshotFileInfo, res); + check(snapshotPath, snapshotFileInfo, replRes, ecRes); } byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME; DirectoryListing thisListing; if (showFiles) { out.println(path + " "); } - res.totalDirs++; + totalDirs++; do { assert lastReturnedName != null; thisListing = namenode.getRpcServer().getListing( @@ -470,7 +489,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { } HdfsFileStatus[] files = thisListing.getPartialListing(); for (int i = 0; i < files.length; i++) { - check(path, files[i], res); + check(path, files[i], replRes, ecRes); } lastReturnedName = thisListing.getLastName(); } while (thisListing.hasMore()); @@ -518,10 +537,6 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { } else if (showprogress) { out.print('.'); } - if ((showprogress) && res.totalFiles % 100 == 0) { - out.println(); - out.flush(); - } } private void collectBlocksSummary(String parent, HdfsFileStatus file, Result res, @@ -542,9 +557,10 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { final BlockInfo storedBlock = bm.getStoredBlock( block.getLocalBlock()); + final int minReplication = bm.getMinStorageNum(storedBlock); // count decommissionedReplicas / decommissioningReplicas NumberReplicas numberReplicas = bm.countNodes(storedBlock); - int decommissionedReplicas = numberReplicas.decommissioned();; + int decommissionedReplicas = numberReplicas.decommissioned(); int decommissioningReplicas = numberReplicas.decommissioning(); res.decommissionedReplicas += decommissionedReplicas; res.decommissioningReplicas += decommissioningReplicas; @@ -556,11 +572,18 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { res.totalReplicas += totalReplicasPerBlock; // count expected replicas - short targetFileReplication = file.getReplication(); + short targetFileReplication; + if (file.getECSchema() != null) { + assert storedBlock instanceof BlockInfoStriped; + targetFileReplication = ((BlockInfoStriped) storedBlock) + .getRealTotalBlockNum(); + } else { + targetFileReplication = file.getReplication(); + } res.numExpectedReplicas += targetFileReplication; // count under min repl'd blocks - if(totalReplicasPerBlock < res.minReplication){ + if(totalReplicasPerBlock < minReplication){ res.numUnderMinReplicatedBlocks++; } @@ -581,7 +604,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { } // count minimally replicated blocks - if (totalReplicasPerBlock >= res.minReplication) + if (totalReplicasPerBlock >= minReplication) res.numMinReplicatedBlocks++; // count missing replicas / under replicated blocks @@ -601,7 +624,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { } // count mis replicated blocks - BlockPlacementStatus blockPlacementStatus = bpPolicy + BlockPlacementStatus blockPlacementStatus = bpPolicies.getPolicy(false) .verifyBlockPlacement(path, lBlk, targetFileReplication); if (!blockPlacementStatus.isPlacementPolicySatisfied()) { res.numMisReplicatedBlocks++; @@ -636,9 +659,9 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { report.append(" Live_repl=" + liveReplicas); if (showLocations || showRacks || showReplicaDetails) { StringBuilder sb = new StringBuilder("["); - Iterable storages = bm.getStorages(block.getLocalBlock()); - for (Iterator iterator = storages.iterator(); iterator.hasNext();) { - DatanodeStorageInfo storage = iterator.next(); + DatanodeStorageInfo[] storages = bm.getStorages(storedBlock); + for (int i = 0; i < storages.length; i++) { + DatanodeStorageInfo storage = storages[i]; DatanodeDescriptor dnDesc = storage.getDatanodeDescriptor(); if (showRacks) { sb.append(NodeBase.getPath(dnDesc)); @@ -647,7 +670,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { .getStorageType())); } if (showReplicaDetails) { - LightWeightLinkedSet blocksExcess = + LightWeightLinkedSet blocksExcess = bm.excessReplicateMap.get(dnDesc.getDatanodeUuid()); Collection corruptReplicas = bm.getCorruptReplicas(block.getLocalBlock()); @@ -668,7 +691,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { sb.append("LIVE)"); } } - if (iterator.hasNext()) { + if (i < storages.length - 1) { sb.append(", "); } } @@ -982,7 +1005,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { long missingReplicas = 0L; long decommissionedReplicas = 0L; long decommissioningReplicas = 0L; - long numUnderMinReplicatedBlocks=0L; + long numUnderMinReplicatedBlocks = 0L; long numOverReplicatedBlocks = 0L; long numUnderReplicatedBlocks = 0L; long numMisReplicatedBlocks = 0L; // blocks that do not satisfy block placement policy @@ -992,22 +1015,10 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { long totalOpenFilesBlocks = 0L; long totalFiles = 0L; long totalOpenFiles = 0L; - long totalDirs = 0L; - long totalSymlinks = 0L; long totalSize = 0L; long totalOpenFilesSize = 0L; long totalReplicas = 0L; - final short replication; - final int minReplication; - - Result(Configuration conf) { - this.replication = (short)conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, - DFSConfigKeys.DFS_REPLICATION_DEFAULT); - this.minReplication = (short)conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, - DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT); - } - /** * DFS is considered healthy if there are no missing blocks. */ @@ -1033,19 +1044,29 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { return 0.0f; return (float) (totalReplicas) / (float) totalBlocks; } + } + + @VisibleForTesting + static class ReplicationResult extends Result { + final short replication; + final short minReplication; + + ReplicationResult(Configuration conf) { + this.replication = (short)conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, + DFSConfigKeys.DFS_REPLICATION_DEFAULT); + this.minReplication = (short)conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT); + } @Override public String toString() { StringBuilder res = new StringBuilder(); - res.append("Status: ").append((isHealthy() ? "HEALTHY" : "CORRUPT")) - .append("\n Total size:\t").append(totalSize).append(" B"); + res.append(" Total size:\t").append(totalSize).append(" B"); if (totalOpenFilesSize != 0) { res.append(" (Total open files size: ").append(totalOpenFilesSize) .append(" B)"); } - res.append("\n Total dirs:\t").append(totalDirs).append( - "\n Total files:\t").append(totalFiles); - res.append("\n Total symlinks:\t\t").append(totalSymlinks); + res.append("\n Total files:\t").append(totalFiles); if (totalOpenFiles != 0) { res.append(" (Files currently being written: ").append(totalOpenFiles) .append(")"); @@ -1135,4 +1156,110 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { return res.toString(); } } + + @VisibleForTesting + static class ErasureCodingResult extends Result { + final String defaultSchema; + + ErasureCodingResult(Configuration conf) { + defaultSchema = ErasureCodingSchemaManager.getSystemDefaultSchema() + .getSchemaName(); + } + + @Override + public String toString() { + StringBuilder res = new StringBuilder(); + res.append(" Total size:\t").append(totalSize).append(" B"); + if (totalOpenFilesSize != 0) { + res.append(" (Total open files size: ").append(totalOpenFilesSize) + .append(" B)"); + } + res.append("\n Total files:\t").append(totalFiles); + if (totalOpenFiles != 0) { + res.append(" (Files currently being written: ").append(totalOpenFiles) + .append(")"); + } + res.append("\n Total block groups (validated):\t").append(totalBlocks); + if (totalBlocks > 0) { + res.append(" (avg. block group size ").append((totalSize / totalBlocks)) + .append(" B)"); + } + if (totalOpenFilesBlocks != 0) { + res.append(" (Total open file block groups (not validated): ").append( + totalOpenFilesBlocks).append(")"); + } + if (corruptFiles > 0 || numUnderMinReplicatedBlocks > 0) { + res.append("\n ********************************"); + if(numUnderMinReplicatedBlocks>0){ + res.append("\n UNRECOVERABLE BLOCK GROUPS:\t").append(numUnderMinReplicatedBlocks); + if(totalBlocks>0){ + res.append(" (").append( + ((float) (numUnderMinReplicatedBlocks * 100) / (float) totalBlocks)) + .append(" %)"); + } + } + if(corruptFiles>0) { + res.append( + "\n CORRUPT FILES:\t").append(corruptFiles); + if (missingSize > 0) { + res.append("\n MISSING BLOCK GROUPS:\t").append(missingIds.size()).append( + "\n MISSING SIZE:\t\t").append(missingSize).append(" B"); + } + if (corruptBlocks > 0) { + res.append("\n CORRUPT BLOCK GROUPS: \t").append(corruptBlocks).append( + "\n CORRUPT SIZE:\t\t").append(corruptSize).append(" B"); + } + } + res.append("\n ********************************"); + } + res.append("\n Minimally erasure-coded block groups:\t").append( + numMinReplicatedBlocks); + if (totalBlocks > 0) { + res.append(" (").append( + ((float) (numMinReplicatedBlocks * 100) / (float) totalBlocks)) + .append(" %)"); + } + res.append("\n Over-erasure-coded block groups:\t") + .append(numOverReplicatedBlocks); + if (totalBlocks > 0) { + res.append(" (").append( + ((float) (numOverReplicatedBlocks * 100) / (float) totalBlocks)) + .append(" %)"); + } + res.append("\n Under-erasure-coded block groups:\t").append( + numUnderReplicatedBlocks); + if (totalBlocks > 0) { + res.append(" (").append( + ((float) (numUnderReplicatedBlocks * 100) / (float) totalBlocks)) + .append(" %)"); + } + res.append("\n Unsatisfactory placement block groups:\t") + .append(numMisReplicatedBlocks); + if (totalBlocks > 0) { + res.append(" (").append( + ((float) (numMisReplicatedBlocks * 100) / (float) totalBlocks)) + .append(" %)"); + } + res.append("\n Default schema:\t\t").append(defaultSchema) + .append("\n Average block group size:\t").append( + getReplicationFactor()).append("\n Missing block groups:\t\t").append( + missingIds.size()).append("\n Corrupt block groups:\t\t").append( + corruptBlocks).append("\n Missing internal blocks:\t").append( + missingReplicas); + if (totalReplicas > 0) { + res.append(" (").append( + ((float) (missingReplicas * 100) / (float) numExpectedReplicas)).append( + " %)"); + } + if (decommissionedReplicas > 0) { + res.append("\n Decommissioned internal blocks:\t").append( + decommissionedReplicas); + } + if (decommissioningReplicas > 0) { + res.append("\n Decommissioning internal blocks:\t").append( + decommissioningReplicas); + } + return res.toString(); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java index a5053bc..430853a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java @@ -17,8 +17,12 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import java.io.IOException; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ErasureCodingZone; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; import org.apache.hadoop.hdfs.util.RwLock; @@ -45,7 +49,16 @@ public interface Namesystem extends RwLock, SafeMode { void checkOperation(OperationCategory read) throws StandbyException; - boolean isInSnapshot(BlockInfoContiguousUnderConstruction blockUC); + boolean isInSnapshot(BlockCollection bc); CacheManager getCacheManager(); + + /** + * Gets the ECZone for path + * @param src the filesystem path + * @return {@link ErasureCodingZone} + * @throws IOException + */ + ErasureCodingZone getErasureCodingZoneForPath(String src) + throws IOException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java index 1428482..252844c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java @@ -44,9 +44,10 @@ public interface SafeMode { /** * Increment number of blocks that reached minimal replication. - * @param replication current replication + * @param replication current replication + * @param storedBlock current stored Block */ - public void incrementSafeBlockCount(int replication); + public void incrementSafeBlockCount(int replication, BlockInfo storedBlock); /** Decrement number of blocks that reached minimal replication. */ public void decrementSafeBlockCount(BlockInfo b); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java index 64ad1f6..e684b41 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java @@ -234,19 +234,23 @@ public class FSImageFormatPBSnapshot { .toByteArray(), permission, acl, fileInPb.getModificationTime(), fileInPb.getAccessTime(), (short) fileInPb.getReplication(), fileInPb.getPreferredBlockSize(), - (byte)fileInPb.getStoragePolicyID(), xAttrs); + (byte)fileInPb.getStoragePolicyID(), xAttrs, + fileInPb.getIsStriped()); } FileDiff diff = new FileDiff(pbf.getSnapshotId(), copy, null, pbf.getFileSize()); List bpl = pbf.getBlocksList(); - BlockInfo[] blocks = new BlockInfo[bpl.size()]; + // in file diff there can only be contiguous blocks + BlockInfoContiguous[] blocks = new BlockInfoContiguous[bpl.size()]; for(int j = 0, e = bpl.size(); j < e; ++j) { Block blk = PBHelper.convert(bpl.get(j)); - BlockInfo storedBlock = fsn.getBlockManager().getStoredBlock(blk); + BlockInfoContiguous storedBlock = + (BlockInfoContiguous) fsn.getBlockManager().getStoredBlock(blk); if(storedBlock == null) { - storedBlock = fsn.getBlockManager().addBlockCollection( - new BlockInfoContiguous(blk, copy.getFileReplication()), file); + storedBlock = (BlockInfoContiguous) fsn.getBlockManager() + .addBlockCollectionWithCheck(new BlockInfoContiguous(blk, + copy.getFileReplication()), file); } blocks[j] = storedBlock; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiffList.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiffList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiffList.java index 6b8388e..0a64a38 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiffList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiffList.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.INode; import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; @@ -55,7 +56,9 @@ public class FileDiffList extends final FileDiff diff = super.saveSelf2Snapshot(latestSnapshotId, iNodeFile, snapshotCopy); if (withBlocks) { // Store blocks if this is the first update - diff.setBlocks(iNodeFile.getBlocks()); + BlockInfo[] blks = iNodeFile.getBlocks(); + assert blks != null; + diff.setBlocks(blks); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java new file mode 100644 index 0000000..56a1546 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocol; + +import com.google.common.base.Joiner; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; +import org.apache.hadoop.io.erasurecode.ECSchema; + +import java.util.Arrays; +import java.util.Collection; + +/** + * A BlockECRecoveryCommand is an instruction to a DataNode to reconstruct a + * striped block group with missing blocks. + * + * Upon receiving this command, the DataNode pulls data from other DataNodes + * hosting blocks in this group and reconstructs the lost blocks through codec + * calculation. + * + * After the reconstruction, the DataNode pushes the reconstructed blocks to + * their final destinations if necessary (e.g., the destination is different + * from the reconstruction node, or multiple blocks in a group are to be + * reconstructed). + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class BlockECRecoveryCommand extends DatanodeCommand { + final Collection ecTasks; + + /** + * Create BlockECRecoveryCommand from a collection of + * {@link BlockECRecoveryInfo}, each representing a recovery task + */ + public BlockECRecoveryCommand(int action, + Collection blockECRecoveryInfoList) { + super(action); + this.ecTasks = blockECRecoveryInfoList; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("BlockECRecoveryCommand(\n "); + Joiner.on("\n ").appendTo(sb, ecTasks); + sb.append("\n)"); + return sb.toString(); + } + + /** Block and targets pair */ + @InterfaceAudience.Private + @InterfaceStability.Evolving + public static class BlockECRecoveryInfo { + private final ExtendedBlock block; + private final DatanodeInfo[] sources; + private DatanodeInfo[] targets; + private String[] targetStorageIDs; + private StorageType[] targetStorageTypes; + private final short[] liveBlockIndices; + private final ECSchema ecSchema; + private final int cellSize; + + public BlockECRecoveryInfo(ExtendedBlock block, DatanodeInfo[] sources, + DatanodeStorageInfo[] targetDnStorageInfo, short[] liveBlockIndices, + ECSchema ecSchema, int cellSize) { + this(block, sources, DatanodeStorageInfo + .toDatanodeInfos(targetDnStorageInfo), DatanodeStorageInfo + .toStorageIDs(targetDnStorageInfo), DatanodeStorageInfo + .toStorageTypes(targetDnStorageInfo), liveBlockIndices, ecSchema, + cellSize); + } + + public BlockECRecoveryInfo(ExtendedBlock block, DatanodeInfo[] sources, + DatanodeInfo[] targets, String[] targetStorageIDs, + StorageType[] targetStorageTypes, short[] liveBlockIndices, + ECSchema ecSchema, int cellSize) { + this.block = block; + this.sources = sources; + this.targets = targets; + this.targetStorageIDs = targetStorageIDs; + this.targetStorageTypes = targetStorageTypes; + this.liveBlockIndices = liveBlockIndices; + this.ecSchema = ecSchema; + this.cellSize = cellSize; + } + + public ExtendedBlock getExtendedBlock() { + return block; + } + + public DatanodeInfo[] getSourceDnInfos() { + return sources; + } + + public DatanodeInfo[] getTargetDnInfos() { + return targets; + } + + public String[] getTargetStorageIDs() { + return targetStorageIDs; + } + + public StorageType[] getTargetStorageTypes() { + return targetStorageTypes; + } + + public short[] getLiveBlockIndices() { + return liveBlockIndices; + } + + public ECSchema getECSchema() { + return ecSchema; + } + + public int getCellSize() { + return cellSize; + } + + @Override + public String toString() { + return new StringBuilder().append("BlockECRecoveryInfo(\n ") + .append("Recovering ").append(block).append(" From: ") + .append(Arrays.asList(sources)).append(" To: [") + .append(Arrays.asList(targets)).append(")\n") + .append(" Block Indices: ").append(Arrays.asList(liveBlockIndices)) + .toString(); + } + } + + public Collection getECTasks() { + return this.ecTasks; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java index a985dbd..0507faf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.protocol; +import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.StorageType; @@ -91,6 +92,30 @@ public class BlocksWithLocations { } } + public static class StripedBlockWithLocations extends BlockWithLocations { + final byte[] indices; + final short dataBlockNum; + + public StripedBlockWithLocations(BlockWithLocations blk, byte[] indices, + short dataBlockNum) { + super(blk.getBlock(), blk.getDatanodeUuids(), blk.getStorageIDs(), + blk.getStorageTypes()); + Preconditions.checkArgument( + blk.getDatanodeUuids().length == indices.length); + this.indices = indices; + this.dataBlockNum = dataBlockNum; + + } + + public byte[] getIndices() { + return indices; + } + + public short getDataBlockNum() { + return dataBlockNum; + } + } + private final BlockWithLocations[] blocks; /** Constructor with one parameter */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java index dfe0813..add4e73 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java @@ -76,6 +76,7 @@ public interface DatanodeProtocol { final static int DNA_BALANCERBANDWIDTHUPDATE = 8; // update balancer bandwidth final static int DNA_CACHE = 9; // cache blocks final static int DNA_UNCACHE = 10; // uncache blocks + final static int DNA_ERASURE_CODING_RECOVERY = 11; // erasure coding recovery command /** * Register Datanode. http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/erasurecode/ECCli.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/erasurecode/ECCli.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/erasurecode/ECCli.java new file mode 100644 index 0000000..4ed9d0a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/erasurecode/ECCli.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdfs.tools.erasurecode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FsShell; +import org.apache.hadoop.fs.shell.CommandFactory; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.util.ToolRunner; + +/** + * CLI for the erasure code encoding operations. + */ +public class ECCli extends FsShell { + + private final static String usagePrefix = + "Usage: hdfs erasurecode [generic options]"; + + @Override + protected String getUsagePrefix() { + return usagePrefix; + } + + @Override + protected void registerCommands(CommandFactory factory) { + factory.registerCommands(ECCommand.class); + } + + public static void main(String[] args) throws Exception { + Configuration conf = new HdfsConfiguration(); + int res = ToolRunner.run(conf, new ECCli(), args); + System.exit(res); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/erasurecode/ECCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/erasurecode/ECCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/erasurecode/ECCommand.java new file mode 100644 index 0000000..03026d8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/erasurecode/ECCommand.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdfs.tools.erasurecode; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.shell.Command; +import org.apache.hadoop.fs.shell.CommandFactory; +import org.apache.hadoop.fs.shell.PathData; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.protocol.ErasureCodingZone; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException; +import org.apache.hadoop.io.erasurecode.ECSchema; +import org.apache.hadoop.util.StringUtils; + +/** + * Erasure Coding CLI commands + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class ECCommand extends Command { + + public static void registerCommands(CommandFactory factory) { + // Register all commands of Erasure CLI, with a '-' at the beginning in name + // of the command. + factory.addClass(CreateECZoneCommand.class, "-" + CreateECZoneCommand.NAME); + factory.addClass(GetECZoneCommand.class, "-" + + GetECZoneCommand.NAME); + factory.addClass(ListECSchemas.class, "-" + ListECSchemas.NAME); + } + + @Override + public String getCommandName() { + return getName(); + } + + @Override + protected void run(Path path) throws IOException { + throw new RuntimeException("Not suppose to get here"); + } + + @Deprecated + @Override + public int runAll() { + return run(args); + } + + @Override + protected void processPath(PathData item) throws IOException { + if (!(item.fs instanceof DistributedFileSystem)) { + throw new UnsupportedActionException( + "Erasure commands are only supported for the HDFS paths"); + } + } + + /** + * Create EC encoding zone command. Zones are created to use specific EC + * encoding schema, other than default while encoding the files under some + * specific directory. + */ + static class CreateECZoneCommand extends ECCommand { + public static final String NAME = "createZone"; + public static final String USAGE = "[-s ] [-c ] "; + public static final String DESCRIPTION = + "Create a zone to encode files using a specified schema\n" + + "Options :\n" + + " -s : EC schema name to encode files. " + + "If not passed default schema will be used\n" + + " -c : cell size to use for striped encoding files." + + " If not passed default cellsize of " + + HdfsConstants.BLOCK_STRIPED_CELL_SIZE + " will be used\n" + + " : Path to an empty directory. Under this directory " + + "files will be encoded using specified schema"; + private String schemaName; + private int cellSize = 0; + private ECSchema schema = null; + + @Override + protected void processOptions(LinkedList args) throws IOException { + schemaName = StringUtils.popOptionWithArgument("-s", args); + String cellSizeStr = StringUtils.popOptionWithArgument("-c", args); + if (cellSizeStr != null) { + cellSize = (int) StringUtils.TraditionalBinaryPrefix + .string2long(cellSizeStr); + } + if (args.isEmpty()) { + throw new HadoopIllegalArgumentException(" is missing"); + } + if (args.size() > 1) { + throw new HadoopIllegalArgumentException("Too many arguments"); + } + } + + @Override + protected void processPath(PathData item) throws IOException { + super.processPath(item); + DistributedFileSystem dfs = (DistributedFileSystem) item.fs; + try { + if (schemaName != null) { + ECSchema[] ecSchemas = dfs.getClient().getECSchemas(); + for (ECSchema ecSchema : ecSchemas) { + if (schemaName.equals(ecSchema.getSchemaName())) { + schema = ecSchema; + break; + } + } + if (schema == null) { + StringBuilder sb = new StringBuilder(); + sb.append("Schema '"); + sb.append(schemaName); + sb.append("' does not match any of the supported schemas."); + sb.append(" Please select any one of "); + List schemaNames = new ArrayList(); + for (ECSchema ecSchema : ecSchemas) { + schemaNames.add(ecSchema.getSchemaName()); + } + sb.append(schemaNames); + throw new HadoopIllegalArgumentException(sb.toString()); + } + } + dfs.createErasureCodingZone(item.path, schema, cellSize); + out.println("EC Zone created successfully at " + item.path); + } catch (IOException e) { + throw new IOException("Unable to create EC zone for the path " + + item.path + ". " + e.getMessage()); + } + } + } + + /** + * Get the information about the zone + */ + static class GetECZoneCommand extends ECCommand { + public static final String NAME = "getZone"; + public static final String USAGE = ""; + public static final String DESCRIPTION = + "Get information about the EC zone at specified path\n"; + + @Override + protected void processOptions(LinkedList args) throws IOException { + if (args.isEmpty()) { + throw new HadoopIllegalArgumentException(" is missing"); + } + if (args.size() > 1) { + throw new HadoopIllegalArgumentException("Too many arguments"); + } + } + + @Override + protected void processPath(PathData item) throws IOException { + super.processPath(item); + DistributedFileSystem dfs = (DistributedFileSystem) item.fs; + try { + ErasureCodingZone ecZone = dfs.getErasureCodingZone(item.path); + if (ecZone != null) { + out.println(ecZone.toString()); + } else { + out.println("Path " + item.path + " is not in EC zone"); + } + } catch (IOException e) { + throw new IOException("Unable to get EC zone for the path " + + item.path + ". " + e.getMessage()); + } + } + } + + /** + * List all supported EC Schemas + */ + static class ListECSchemas extends ECCommand { + public static final String NAME = "listSchemas"; + public static final String USAGE = ""; + public static final String DESCRIPTION = + "Get the list of ECSchemas supported\n"; + + @Override + protected void processOptions(LinkedList args) throws IOException { + if (!args.isEmpty()) { + throw new HadoopIllegalArgumentException("Too many parameters"); + } + + FileSystem fs = FileSystem.get(getConf()); + if (fs instanceof DistributedFileSystem == false) { + throw new UnsupportedActionException( + "Erasure commands are only supported for the HDFS"); + } + DistributedFileSystem dfs = (DistributedFileSystem) fs; + + ECSchema[] ecSchemas = dfs.getClient().getECSchemas(); + StringBuilder sb = new StringBuilder(); + int i = 0; + while (i < ecSchemas.length) { + ECSchema ecSchema = ecSchemas[i]; + sb.append(ecSchema.getSchemaName()); + i++; + if (i < ecSchemas.length) { + sb.append(", "); + } + } + out.println(sb.toString()); + } + } +}