Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E8D471801E for ; Fri, 14 Aug 2015 17:55:24 +0000 (UTC) Received: (qmail 66441 invoked by uid 500); 14 Aug 2015 17:55:15 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 66278 invoked by uid 500); 14 Aug 2015 17:55:15 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 65126 invoked by uid 99); 14 Aug 2015 17:55:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Aug 2015 17:55:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 26D1DE360C; Fri, 14 Aug 2015 17:55:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhz@apache.org To: common-commits@hadoop.apache.org Date: Fri, 14 Aug 2015 17:55:35 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [23/36] hadoop git commit: HDFS-7285. Erasure Coding Support inside HDFS. http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java index 3d30a19..4f524ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.EncryptionZone; +import org.apache.hadoop.hdfs.protocol.ErasureCodingZone; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; @@ -44,10 +45,13 @@ import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; +import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.net.Node; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.util.ChunkedArrayList; @@ -73,7 +77,7 @@ class FSDirWriteFileOp { Block block) throws IOException { // modify file-> block and blocksMap // fileNode should be under construction - BlockInfoContiguousUnderConstruction uc = fileNode.removeLastBlock(block); + BlockInfoUnderConstruction uc = fileNode.removeLastBlock(block); if (uc == null) { return false; } @@ -87,7 +91,7 @@ class FSDirWriteFileOp { // update space consumed fsd.updateCount(iip, 0, -fileNode.getPreferredBlockSize(), - fileNode.getPreferredBlockReplication(), true); + fileNode.getPreferredBlockReplication(), true); return true; } @@ -167,9 +171,10 @@ class FSDirWriteFileOp { String src, long fileId, String clientName, ExtendedBlock previous, LocatedBlock[] onRetryBlock) throws IOException { final long blockSize; - final int replication; + final short numTargets; final byte storagePolicyID; String clientMachine; + final boolean isStriped; byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); src = fsn.dir.resolvePath(pc, src, pathComponents); @@ -195,18 +200,21 @@ class FSDirWriteFileOp { blockSize = pendingFile.getPreferredBlockSize(); clientMachine = pendingFile.getFileUnderConstructionFeature() .getClientMachine(); - replication = pendingFile.getFileReplication(); + isStriped = pendingFile.isStriped(); + numTargets = isStriped ? + HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS : + pendingFile.getFileReplication(); storagePolicyID = pendingFile.getStoragePolicyID(); - return new ValidateAddBlockResult(blockSize, replication, storagePolicyID, - clientMachine); + return new ValidateAddBlockResult(blockSize, numTargets, storagePolicyID, + clientMachine, isStriped); } - static LocatedBlock makeLocatedBlock(FSNamesystem fsn, Block blk, + static LocatedBlock makeLocatedBlock(FSNamesystem fsn, BlockInfo blk, DatanodeStorageInfo[] locs, long offset) throws IOException { LocatedBlock lBlk = BlockManager.newLocatedBlock(fsn.getExtendedBlock(blk), - locs, offset, false); - fsn.getFSDirectory().getBlockManager() - .setBlockToken(lBlk, BlockTokenIdentifier.AccessMode.WRITE); + blk, locs, offset); + fsn.getBlockManager().setBlockToken(lBlk, + BlockTokenIdentifier.AccessMode.WRITE); return lBlk; } @@ -236,8 +244,9 @@ class FSDirWriteFileOp { } else { // add new chosen targets to already allocated block and return BlockInfo lastBlockInFile = pendingFile.getLastBlock(); - ((BlockInfoContiguousUnderConstruction) lastBlockInFile) - .setExpectedLocations(targets); + final BlockInfoUnderConstruction uc + = (BlockInfoUnderConstruction)lastBlockInFile; + uc.setExpectedLocations(targets); offset = pendingFile.computeFileSize(); return makeLocatedBlock(fsn, lastBlockInFile, targets, offset); } @@ -248,15 +257,17 @@ class FSDirWriteFileOp { ExtendedBlock.getLocalBlock(previous)); // allocate new block, record block locations in INode. - Block newBlock = fsn.createNewBlock(); + final boolean isStriped = pendingFile.isStriped(); + // allocate new block, record block locations in INode. + Block newBlock = fsn.createNewBlock(isStriped); INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile); - saveAllocatedBlock(fsn, src, inodesInPath, newBlock, targets); + saveAllocatedBlock(fsn, src, inodesInPath, newBlock, targets, isStriped); persistNewBlock(fsn, src, pendingFile); offset = pendingFile.computeFileSize(); // Return located block - return makeLocatedBlock(fsn, newBlock, targets, offset); + return makeLocatedBlock(fsn, fsn.getStoredBlock(newBlock), targets, offset); } static DatanodeStorageInfo[] chooseTargetForNewBlock( @@ -277,9 +288,10 @@ class FSDirWriteFileOp { : Arrays.asList(favoredNodes); // choose targets for the new block to be allocated. - return bm.chooseTarget4NewBlock(src, r.replication, clientNode, + return bm.chooseTarget4NewBlock(src, r.numTargets, clientNode, excludedNodesSet, r.blockSize, - favoredNodesList, r.storagePolicyID); + favoredNodesList, r.storagePolicyID, + r.isStriped); } /** @@ -468,22 +480,22 @@ class FSDirWriteFileOp { long preferredBlockSize, boolean underConstruction, String clientName, String clientMachine, byte storagePolicyId) { final INodeFile newNode; + Preconditions.checkNotNull(existing); assert fsd.hasWriteLock(); - if (underConstruction) { - newNode = newINodeFile(id, permissions, modificationTime, - modificationTime, replication, - preferredBlockSize, - storagePolicyId); - newNode.toUnderConstruction(clientName, clientMachine); - } else { - newNode = newINodeFile(id, permissions, modificationTime, - atime, replication, - preferredBlockSize, - storagePolicyId); - } - - newNode.setLocalName(localName); try { + // check if the file is in an EC zone + final boolean isStriped = FSDirErasureCodingOp.isInErasureCodingZone( + fsd.getFSNamesystem(), existing); + if (underConstruction) { + newNode = newINodeFile(id, permissions, modificationTime, + modificationTime, replication, preferredBlockSize, storagePolicyId, + isStriped); + newNode.toUnderConstruction(clientName, clientMachine); + } else { + newNode = newINodeFile(id, permissions, modificationTime, atime, + replication, preferredBlockSize, storagePolicyId, isStriped); + } + newNode.setLocalName(localName); INodesInPath iip = fsd.addINode(existing, newNode); if (iip != null) { if (aclEntries != null) { @@ -509,23 +521,38 @@ class FSDirWriteFileOp { */ private static BlockInfo addBlock( FSDirectory fsd, String path, INodesInPath inodesInPath, Block block, - DatanodeStorageInfo[] targets) throws IOException { + DatanodeStorageInfo[] targets, boolean isStriped) throws IOException { fsd.writeLock(); try { final INodeFile fileINode = inodesInPath.getLastINode().asFile(); Preconditions.checkState(fileINode.isUnderConstruction()); - // check quota limits and updated space consumed - fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(), - fileINode.getPreferredBlockReplication(), true); - // associate new last block for the file - BlockInfoContiguousUnderConstruction blockInfo = - new BlockInfoContiguousUnderConstruction( - block, - fileINode.getFileReplication(), - HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, + final BlockInfo blockInfo; + if (isStriped) { + ErasureCodingZone ecZone = FSDirErasureCodingOp.getErasureCodingZone( + fsd.getFSNamesystem(), inodesInPath); + ECSchema ecSchema = ecZone.getSchema(); + short numDataUnits = (short) ecSchema.getNumDataUnits(); + short numParityUnits = (short) ecSchema.getNumParityUnits(); + short numLocations = (short) (numDataUnits + numParityUnits); + + // check quota limits and updated space consumed + fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(), + numLocations, true); + blockInfo = new BlockInfoStripedUnderConstruction(block, ecSchema, + ecZone.getCellSize(), + HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets); + } else { + // check quota limits and updated space consumed + fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(), + fileINode.getPreferredBlockReplication(), true); + + short numLocations = fileINode.getFileReplication(); + blockInfo = new BlockInfoContiguousUnderConstruction(block, + numLocations, HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets); + } fsd.getBlockManager().addBlockCollection(blockInfo, fileINode); fileINode.addBlock(blockInfo); @@ -551,22 +578,24 @@ class FSDirWriteFileOp { String clientName, String clientMachine) throws IOException { + Preconditions.checkNotNull(existing); long modTime = now(); - INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions, - modTime, modTime, replication, preferredBlockSize); - newNode.setLocalName(localName.getBytes(Charsets.UTF_8)); - newNode.toUnderConstruction(clientName, clientMachine); - INodesInPath newiip; fsd.writeLock(); try { + final boolean isStriped = FSDirErasureCodingOp.isInErasureCodingZone( + fsd.getFSNamesystem(), existing); + INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions, + modTime, modTime, replication, preferredBlockSize, isStriped); + newNode.setLocalName(localName.getBytes(Charsets.UTF_8)); + newNode.toUnderConstruction(clientName, clientMachine); newiip = fsd.addINode(existing, newNode); } finally { fsd.writeUnlock(); } if (newiip == null) { NameNode.stateChangeLog.info("DIR* addFile: failed to add " + - existing.getPath() + "/" + localName); + existing.getPath() + "/" + localName); return null; } @@ -579,7 +608,7 @@ class FSDirWriteFileOp { private static FileState analyzeFileState( FSNamesystem fsn, String src, long fileId, String clientName, ExtendedBlock previous, LocatedBlock[] onRetryBlock) - throws IOException { + throws IOException { assert fsn.hasReadLock(); checkBlock(fsn, previous); @@ -662,8 +691,8 @@ class FSDirWriteFileOp { "allocation of a new block in " + src + ". Returning previously" + " allocated block " + lastBlockInFile); long offset = file.computeFileSize(); - BlockInfoContiguousUnderConstruction lastBlockUC = - (BlockInfoContiguousUnderConstruction) lastBlockInFile; + BlockInfoUnderConstruction lastBlockUC = + (BlockInfoUnderConstruction) lastBlockInFile; onRetryBlock[0] = makeLocatedBlock(fsn, lastBlockInFile, lastBlockUC.getExpectedStorageLocations(), offset); return new FileState(file, src, iip); @@ -688,14 +717,8 @@ class FSDirWriteFileOp { checkBlock(fsn, last); byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); src = fsn.dir.resolvePath(pc, src, pathComponents); - boolean success = completeFileInternal(fsn, src, holder, - ExtendedBlock.getLocalBlock(last), - fileId); - if (success) { - NameNode.stateChangeLog.info("DIR* completeFile: " + srcArg - + " is closed by " + holder); - } - return success; + return completeFileInternal(fsn, src, holder, + ExtendedBlock.getLocalBlock(last), fileId); } private static boolean completeFileInternal( @@ -760,16 +783,18 @@ class FSDirWriteFileOp { private static INodeFile newINodeFile( long id, PermissionStatus permissions, long mtime, long atime, - short replication, long preferredBlockSize, byte storagePolicyId) { + short replication, long preferredBlockSize, byte storagePolicyId, + boolean isStriped) { return new INodeFile(id, null, permissions, mtime, atime, BlockInfo.EMPTY_ARRAY, replication, preferredBlockSize, - storagePolicyId); + storagePolicyId, isStriped); } private static INodeFile newINodeFile(long id, PermissionStatus permissions, - long mtime, long atime, short replication, long preferredBlockSize) { + long mtime, long atime, short replication, long preferredBlockSize, + boolean isStriped) { return newINodeFile(id, permissions, mtime, atime, replication, - preferredBlockSize, (byte)0); + preferredBlockSize, (byte)0, isStriped); } /** @@ -797,13 +822,12 @@ class FSDirWriteFileOp { * @param targets target datanodes where replicas of the new block is placed * @throws QuotaExceededException If addition of block exceeds space quota */ - private static void saveAllocatedBlock( - FSNamesystem fsn, String src, INodesInPath inodesInPath, Block newBlock, - DatanodeStorageInfo[] targets) - throws IOException { + private static void saveAllocatedBlock(FSNamesystem fsn, String src, + INodesInPath inodesInPath, Block newBlock, DatanodeStorageInfo[] targets, + boolean isStriped) throws IOException { assert fsn.hasWriteLock(); - BlockInfo b = addBlock(fsn.dir, src, inodesInPath, newBlock, - targets); + BlockInfo b = addBlock(fsn.dir, src, inodesInPath, newBlock, targets, + isStriped); NameNode.stateChangeLog.info("BLOCK* allocate " + b + " for " + src); DatanodeStorageInfo.incrementBlocksScheduled(targets); } @@ -852,17 +876,19 @@ class FSDirWriteFileOp { static class ValidateAddBlockResult { final long blockSize; - final int replication; + final int numTargets; final byte storagePolicyID; final String clientMachine; + final boolean isStriped; ValidateAddBlockResult( - long blockSize, int replication, byte storagePolicyID, - String clientMachine) { + long blockSize, int numTargets, byte storagePolicyID, + String clientMachine, boolean isStriped) { this.blockSize = blockSize; - this.replication = replication; + this.numTargets = numTargets; this.storagePolicyID = storagePolicyID; this.clientMachine = clientMachine; + this.isStriped = isStriped; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index ccee1ae..ce48595 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -201,6 +201,9 @@ public class FSDirectory implements Closeable { @VisibleForTesting public final EncryptionZoneManager ezManager; + @VisibleForTesting + public final ErasureCodingZoneManager ecZoneManager; + /** * Caches frequently used file names used in {@link INode} to reuse * byte[] objects and reduce heap usage. @@ -292,6 +295,7 @@ public class FSDirectory implements Closeable { namesystem = ns; this.editLog = ns.getEditLog(); ezManager = new EncryptionZoneManager(this, conf); + ecZoneManager = new ErasureCodingZoneManager(this); } FSNamesystem getFSNamesystem() { @@ -513,7 +517,7 @@ public class FSDirectory implements Closeable { final INodeFile fileINode = iip.getLastINode().asFile(); EnumCounters typeSpaceDeltas = getStorageTypeDeltas(fileINode.getStoragePolicyID(), ssDelta, - replication, replication);; + replication, replication); updateCount(iip, iip.length() - 1, new QuotaCounts.Builder().nameSpace(nsDelta).storageSpace(ssDelta * replication). typeSpaces(typeSpaceDeltas).build(), http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index 3dd076d..246fcd8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -36,16 +36,20 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.XAttrSetFlag; +import org.apache.hadoop.hdfs.protocol.ErasureCodingZone; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; import org.apache.hadoop.hdfs.protocol.LayoutVersion; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; @@ -413,7 +417,9 @@ public class FSEditLogLoader { // Update the salient file attributes. newFile.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID); newFile.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID); - updateBlocks(fsDir, addCloseOp, iip, newFile); + ErasureCodingZone ecZone = FSDirErasureCodingOp.getErasureCodingZone( + fsDir.getFSNamesystem(), iip); + updateBlocks(fsDir, addCloseOp, iip, newFile, ecZone); break; } case OP_CLOSE: { @@ -433,7 +439,9 @@ public class FSEditLogLoader { // Update the salient file attributes. file.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID); file.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID); - updateBlocks(fsDir, addCloseOp, iip, file); + ErasureCodingZone ecZone = FSDirErasureCodingOp.getErasureCodingZone( + fsDir.getFSNamesystem(), iip); + updateBlocks(fsDir, addCloseOp, iip, file, ecZone); // Now close the file if (!file.isUnderConstruction() && @@ -491,8 +499,10 @@ public class FSEditLogLoader { INodesInPath iip = fsDir.getINodesInPath(path, true); INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path); // Update in-memory data structures - updateBlocks(fsDir, updateOp, iip, oldFile); - + ErasureCodingZone ecZone = FSDirErasureCodingOp.getErasureCodingZone( + fsDir.getFSNamesystem(), iip); + updateBlocks(fsDir, updateOp, iip, oldFile, ecZone); + if (toAddRetryCache) { fsNamesys.addCacheEntry(updateOp.rpcClientId, updateOp.rpcCallId); } @@ -505,9 +515,12 @@ public class FSEditLogLoader { FSNamesystem.LOG.debug(op.opCode + ": " + path + " new block id : " + addBlockOp.getLastBlock().getBlockId()); } - INodeFile oldFile = INodeFile.valueOf(fsDir.getINode(path), path); + INodesInPath iip = fsDir.getINodesInPath(path, true); + INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path); // add the new block to the INodeFile - addNewBlock(fsDir, addBlockOp, oldFile); + ErasureCodingZone ecZone = FSDirErasureCodingOp.getErasureCodingZone( + fsDir.getFSNamesystem(), iip); + addNewBlock(addBlockOp, oldFile, ecZone); break; } case OP_SET_REPLICATION: { @@ -787,8 +800,15 @@ public class FSEditLogLoader { } case OP_ALLOCATE_BLOCK_ID: { AllocateBlockIdOp allocateBlockIdOp = (AllocateBlockIdOp) op; - fsNamesys.getBlockIdManager().setLastAllocatedBlockId( - allocateBlockIdOp.blockId); + if (BlockIdManager.isStripedBlockID(allocateBlockIdOp.blockId)) { + // ALLOCATE_BLOCK_ID is added for sequential block id, thus if the id + // is negative, it must belong to striped blocks + fsNamesys.getBlockIdManager().setLastAllocatedStripedBlockId( + allocateBlockIdOp.blockId); + } else { + fsNamesys.getBlockIdManager().setLastAllocatedContiguousBlockId( + allocateBlockIdOp.blockId); + } break; } case OP_ROLLING_UPGRADE_START: { @@ -941,16 +961,16 @@ public class FSEditLogLoader { /** * Add a new block into the given INodeFile */ - private void addNewBlock(FSDirectory fsDir, AddBlockOp op, INodeFile file) - throws IOException { + private void addNewBlock(AddBlockOp op, INodeFile file, + ErasureCodingZone ecZone) throws IOException { BlockInfo[] oldBlocks = file.getBlocks(); Block pBlock = op.getPenultimateBlock(); Block newBlock= op.getLastBlock(); if (pBlock != null) { // the penultimate block is not null - Preconditions.checkState(oldBlocks != null && oldBlocks.length > 0); + assert oldBlocks != null && oldBlocks.length > 0; // compare pBlock with the last block of oldBlocks - Block oldLastBlock = oldBlocks[oldBlocks.length - 1]; + BlockInfo oldLastBlock = oldBlocks[oldBlocks.length - 1]; if (oldLastBlock.getBlockId() != pBlock.getBlockId() || oldLastBlock.getGenerationStamp() != pBlock.getGenerationStamp()) { throw new IOException( @@ -960,19 +980,25 @@ public class FSEditLogLoader { } oldLastBlock.setNumBytes(pBlock.getNumBytes()); - if (oldLastBlock instanceof BlockInfoContiguousUnderConstruction) { - fsNamesys.getBlockManager().forceCompleteBlock(file, - (BlockInfoContiguousUnderConstruction) oldLastBlock); + if (!oldLastBlock.isComplete()) { + fsNamesys.getBlockManager().forceCompleteBlock(file, oldLastBlock); fsNamesys.getBlockManager().processQueuedMessagesForBlock(pBlock); } } else { // the penultimate block is null Preconditions.checkState(oldBlocks == null || oldBlocks.length == 0); } // add the new block - BlockInfo newBI = new BlockInfoContiguousUnderConstruction( - newBlock, file.getPreferredBlockReplication()); - fsNamesys.getBlockManager().addBlockCollection(newBI, file); - file.addBlock(newBI); + final BlockInfo newBlockInfo; + boolean isStriped = ecZone != null; + if (isStriped) { + newBlockInfo = new BlockInfoStripedUnderConstruction(newBlock, + ecZone.getSchema(), ecZone.getCellSize()); + } else { + newBlockInfo = new BlockInfoContiguousUnderConstruction(newBlock, + file.getPreferredBlockReplication()); + } + fsNamesys.getBlockManager().addBlockCollectionWithCheck(newBlockInfo, file); + file.addBlock(newBlockInfo); fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock); } @@ -981,7 +1007,8 @@ public class FSEditLogLoader { * @throws IOException */ private void updateBlocks(FSDirectory fsDir, BlockListUpdatingOp op, - INodesInPath iip, INodeFile file) throws IOException { + INodesInPath iip, INodeFile file, ErasureCodingZone ecZone) + throws IOException { // Update its block list BlockInfo[] oldBlocks = file.getBlocks(); Block[] newBlocks = op.getBlocks(); @@ -1010,11 +1037,10 @@ public class FSEditLogLoader { oldBlock.getGenerationStamp() != newBlock.getGenerationStamp(); oldBlock.setGenerationStamp(newBlock.getGenerationStamp()); - if (oldBlock instanceof BlockInfoContiguousUnderConstruction && + if (!oldBlock.isComplete() && (!isLastBlock || op.shouldCompleteLastBlock())) { changeMade = true; - fsNamesys.getBlockManager().forceCompleteBlock(file, - (BlockInfoContiguousUnderConstruction) oldBlock); + fsNamesys.getBlockManager().forceCompleteBlock(file, oldBlock); } if (changeMade) { // The state or gen-stamp of the block has changed. So, we may be @@ -1041,25 +1067,38 @@ public class FSEditLogLoader { throw new IOException("Trying to delete non-existant block " + oldBlock); } } else if (newBlocks.length > oldBlocks.length) { + final boolean isStriped = ecZone != null; // We're adding blocks for (int i = oldBlocks.length; i < newBlocks.length; i++) { Block newBlock = newBlocks[i]; - BlockInfo newBI; + final BlockInfo newBI; if (!op.shouldCompleteLastBlock()) { // TODO: shouldn't this only be true for the last block? // what about an old-version fsync() where fsync isn't called // until several blocks in? - newBI = new BlockInfoContiguousUnderConstruction( - newBlock, file.getPreferredBlockReplication()); + if (isStriped) { + newBI = new BlockInfoStripedUnderConstruction(newBlock, + ecZone.getSchema(), ecZone.getCellSize()); + } else { + newBI = new BlockInfoContiguousUnderConstruction(newBlock, + file.getPreferredBlockReplication()); + } } else { // OP_CLOSE should add finalized blocks. This code path // is only executed when loading edits written by prior // versions of Hadoop. Current versions always log // OP_ADD operations as each block is allocated. - newBI = new BlockInfoContiguous(newBlock, - file.getPreferredBlockReplication()); + // TODO: ECSchema can be restored from persisted file (HDFS-7859). + if (isStriped) { + newBI = new BlockInfoStriped(newBlock, + ErasureCodingSchemaManager.getSystemDefaultSchema(), + ecZone.getCellSize()); + } else { + newBI = new BlockInfoContiguous(newBlock, + file.getPreferredBlockReplication()); + } } - fsNamesys.getBlockManager().addBlockCollection(newBI, file); + fsNamesys.getBlockManager().addBlockCollectionWithCheck(newBI, file); file.addBlock(newBI); fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java index 30517d0..1a1d6b9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java @@ -48,6 +48,7 @@ import org.apache.hadoop.fs.PathIsNotDirectoryException; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LayoutFlags; import org.apache.hadoop.hdfs.protocol.LayoutVersion; @@ -359,7 +360,8 @@ public class FSImageFormat { // read the max sequential block ID. long maxSequentialBlockId = in.readLong(); - namesystem.getBlockIdManager().setLastAllocatedBlockId(maxSequentialBlockId); + namesystem.getBlockIdManager().setLastAllocatedContiguousBlockId( + maxSequentialBlockId); } else { long startingGenStamp = namesystem.getBlockIdManager() @@ -691,7 +693,7 @@ public class FSImageFormat { if (blocks != null) { final BlockManager bm = namesystem.getBlockManager(); for (int i = 0; i < blocks.length; i++) { - file.setBlock(i, bm.addBlockCollection(blocks[i], file)); + file.setBlock(i, bm.addBlockCollectionWithCheck(blocks[i], file)); } } } @@ -791,14 +793,14 @@ public class FSImageFormat { counter.increment(); } - final INodeFile file = new INodeFile(inodeId, localName, permissions, + INodeFile file = new INodeFile(inodeId, localName, permissions, modificationTime, atime, blocks, replication, blockSize); if (underConstruction) { file.toUnderConstruction(clientName, clientMachine); } - return fileDiffs == null ? file : new INodeFile(file, fileDiffs); - } else if (numBlocks == -1) { - //directory + return fileDiffs == null ? file : new INodeFile(file, fileDiffs); + } else if (numBlocks == -1) { + //directory //read quotas final long nsQuota = in.readLong(); @@ -897,7 +899,7 @@ public class FSImageFormat { final long preferredBlockSize = in.readLong(); return new INodeFileAttributes.SnapshotCopy(name, permissions, null, modificationTime, - accessTime, replication, preferredBlockSize, (byte) 0, null); + accessTime, replication, preferredBlockSize, (byte) 0, null, false); } public INodeDirectoryAttributes loadINodeDirectoryAttributes(DataInput in) @@ -961,8 +963,8 @@ public class FSImageFormat { if (oldnode.numBlocks() > 0) { BlockInfo ucBlock = cons.getLastBlock(); // we do not replace the inode, just replace the last block of oldnode - BlockInfo info = namesystem.getBlockManager().addBlockCollection( - ucBlock, oldnode); + BlockInfo info = namesystem.getBlockManager() + .addBlockCollectionWithCheck(ucBlock, oldnode); oldnode.setBlock(oldnode.numBlocks() - 1, info); } @@ -1269,7 +1271,7 @@ public class FSImageFormat { out.writeLong(sourceNamesystem.getBlockIdManager().getGenerationStampV1()); out.writeLong(sourceNamesystem.getBlockIdManager().getGenerationStampV2()); out.writeLong(sourceNamesystem.getBlockIdManager().getGenerationStampAtblockIdSwitch()); - out.writeLong(sourceNamesystem.getBlockIdManager().getLastAllocatedBlockId()); + out.writeLong(sourceNamesystem.getBlockIdManager().getLastAllocatedContiguousBlockId()); out.writeLong(context.getTxId()); out.writeLong(sourceNamesystem.dir.getLastInodeId()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java index e8378e5..c4fc1ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java @@ -45,6 +45,8 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.LoaderContext; import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.SaverContext; @@ -68,6 +70,7 @@ import org.apache.hadoop.hdfs.util.ReadOnlyList; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; +import org.apache.hadoop.io.erasurecode.ECSchema; @InterfaceAudience.Private public final class FSImageFormatPBINode { @@ -218,7 +221,7 @@ public final class FSImageFormatPBINode { final BlockInfo[] blocks = file.getBlocks(); if (blocks != null) { for (int i = 0; i < blocks.length; i++) { - file.setBlock(i, bm.addBlockCollection(blocks[i], file)); + file.setBlock(i, bm.addBlockCollectionWithCheck(blocks[i], file)); } } } @@ -331,27 +334,39 @@ public final class FSImageFormatPBINode { INodeSection.INodeFile f = n.getFile(); List bp = f.getBlocksList(); short replication = (short) f.getReplication(); + boolean isStriped = f.getIsStriped(); LoaderContext state = parent.getLoaderContext(); + ECSchema schema = ErasureCodingSchemaManager.getSystemDefaultSchema(); + if (isStriped) { + Preconditions.checkState(f.hasStripingCellSize()); + } BlockInfo[] blocks = new BlockInfo[bp.size()]; - for (int i = 0, e = bp.size(); i < e; ++i) { - blocks[i] = - new BlockInfoContiguous(PBHelper.convert(bp.get(i)), replication); + for (int i = 0; i < bp.size(); ++i) { + BlockProto b = bp.get(i); + if (isStriped) { + blocks[i] = new BlockInfoStriped(PBHelper.convert(b), schema, + (int)f.getStripingCellSize()); + } else { + blocks[i] = new BlockInfoContiguous(PBHelper.convert(b), + replication); + } } + final PermissionStatus permissions = loadPermission(f.getPermission(), parent.getLoaderContext().getStringTable()); final INodeFile file = new INodeFile(n.getId(), n.getName().toByteArray(), permissions, f.getModificationTime(), f.getAccessTime(), blocks, replication, f.getPreferredBlockSize(), - (byte)f.getStoragePolicyID()); + (byte)f.getStoragePolicyID(), isStriped); if (f.hasAcl()) { int[] entries = AclEntryStatusFormat.toInt(loadAclEntries( f.getAcl(), state.getStringTable())); file.addAclFeature(new AclFeature(entries)); } - + if (f.hasXAttrs()) { file.addXAttrFeature(new XAttrFeature( loadXAttrs(f.getXAttrs(), state.getStringTable()))); @@ -364,8 +379,16 @@ public final class FSImageFormatPBINode { if (blocks.length > 0) { BlockInfo lastBlk = file.getLastBlock(); // replace the last block of file - file.setBlock(file.numBlocks() - 1, new BlockInfoContiguousUnderConstruction( - lastBlk, replication)); + final BlockInfo ucBlk; + if (isStriped) { + BlockInfoStriped striped = (BlockInfoStriped) lastBlk; + ucBlk = new BlockInfoStripedUnderConstruction(striped, + schema, (int)f.getStripingCellSize()); + } else { + ucBlk = new BlockInfoContiguousUnderConstruction(lastBlk, + replication); + } + file.setBlock(file.numBlocks() - 1, ucBlk); } } return file; @@ -479,7 +502,8 @@ public final class FSImageFormatPBINode { .setPermission(buildPermissionStatus(file, state.getStringMap())) .setPreferredBlockSize(file.getPreferredBlockSize()) .setReplication(file.getFileReplication()) - .setStoragePolicyID(file.getLocalStoragePolicyID()); + .setStoragePolicyID(file.getLocalStoragePolicyID()) + .setIsStriped(file.isStriped()); AclFeature f = file.getAclFeature(); if (f != null) { @@ -633,13 +657,24 @@ public final class FSImageFormatPBINode { private void save(OutputStream out, INodeFile n) throws IOException { INodeSection.INodeFile.Builder b = buildINodeFile(n, parent.getSaverContext()); + BlockInfo[] blocks = n.getBlocks(); - if (n.getBlocks() != null) { + if (blocks != null) { for (Block block : n.getBlocks()) { b.addBlocks(PBHelper.convert(block)); } } + if (n.isStriped()) { + if (blocks != null && blocks.length > 0) { + BlockInfo firstBlock = blocks[0]; + Preconditions.checkState(firstBlock.isStriped()); + b.setStripingCellSize(((BlockInfoStriped)firstBlock).getCellSize()); + } else { + b.setStripingCellSize(HdfsConstants.BLOCK_STRIPED_CELL_SIZE); + } + } + FileUnderConstructionFeature uc = n.getFileUnderConstructionFeature(); if (uc != null) { INodeSection.FileUnderConstructionFeature f = @@ -668,7 +703,7 @@ public final class FSImageFormatPBINode { r.writeDelimitedTo(out); } - private final INodeSection.INode.Builder buildINodeCommon(INode n) { + private INodeSection.INode.Builder buildINodeCommon(INode n) { return INodeSection.INode.newBuilder() .setId(n.getId()) .setName(ByteString.copyFrom(n.getLocalNameBytes())); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java index 7c8a857..ef0cc1d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java @@ -297,7 +297,11 @@ public final class FSImageFormatProtobuf { blockIdManager.setGenerationStampV1(s.getGenstampV1()); blockIdManager.setGenerationStampV2(s.getGenstampV2()); blockIdManager.setGenerationStampV1Limit(s.getGenstampV1Limit()); - blockIdManager.setLastAllocatedBlockId(s.getLastAllocatedBlockId()); + blockIdManager.setLastAllocatedContiguousBlockId(s.getLastAllocatedBlockId()); + if (s.hasLastAllocatedStripedBlockId()) { + blockIdManager.setLastAllocatedStripedBlockId( + s.getLastAllocatedStripedBlockId()); + } imgTxId = s.getTransactionId(); if (s.hasRollingUpgradeStartTime() && fsn.getFSImage().hasRollbackFSImage()) { @@ -549,7 +553,8 @@ public final class FSImageFormatProtobuf { .setGenstampV1(blockIdManager.getGenerationStampV1()) .setGenstampV1Limit(blockIdManager.getGenerationStampV1Limit()) .setGenstampV2(blockIdManager.getGenerationStampV2()) - .setLastAllocatedBlockId(blockIdManager.getLastAllocatedBlockId()) + .setLastAllocatedBlockId(blockIdManager.getLastAllocatedContiguousBlockId()) + .setLastAllocatedStripedBlockId(blockIdManager.getLastAllocatedStripedBlockId()) .setTransactionId(context.getTxId()); // We use the non-locked version of getNamespaceInfo here since http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java index f71cf0b..c204f55 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java @@ -125,12 +125,12 @@ public class FSImageSerialization { short blockReplication = in.readShort(); long modificationTime = in.readLong(); long preferredBlockSize = in.readLong(); - + int numBlocks = in.readInt(); BlockInfo[] blocks = new BlockInfo[numBlocks]; Block blk = new Block(); int i = 0; - for (; i < numBlocks-1; i++) { + for (; i < numBlocks - 1; i++) { blk.readFields(in); blocks[i] = new BlockInfoContiguous(blk, blockReplication); } @@ -138,8 +138,9 @@ public class FSImageSerialization { if(numBlocks > 0) { blk.readFields(in); blocks[i] = new BlockInfoContiguousUnderConstruction( - blk, blockReplication, BlockUCState.UNDER_CONSTRUCTION, null); + blk, blockReplication, BlockUCState.UNDER_CONSTRUCTION, null); } + PermissionStatus perm = PermissionStatus.read(in); String clientName = readString(in); String clientMachine = readString(in); @@ -180,9 +181,9 @@ public class FSImageSerialization { /** * Serialize a {@link INodeFile} node - * @param node The node to write + * @param file The node to write * @param out The {@link DataOutputStream} where the fields are written - * @param writeBlock Whether to write block information + * @param writeUnderConstruction Whether to write block information */ public static void writeINodeFile(INodeFile file, DataOutput out, boolean writeUnderConstruction) throws IOException { @@ -305,7 +306,7 @@ public class FSImageSerialization { if (!isWithName) { Preconditions.checkState(ref instanceof INodeReference.DstReference); // dst snapshot id - out.writeInt(((INodeReference.DstReference) ref).getDstSnapshotId()); + out.writeInt(ref.getDstSnapshotId()); } else { out.writeInt(((INodeReference.WithName) ref).getLastSnapshotId()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 4cc3073..c0bfcc1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hdfs.server.namenode; -import static org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; @@ -58,9 +57,9 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDIT_LOG_AUTOROL import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY; @@ -138,6 +137,7 @@ import org.apache.hadoop.crypto.CipherSuite; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; +import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries; import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.ContentSummary; @@ -149,6 +149,7 @@ import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.InvalidPathException; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.XAttr; import org.apache.hadoop.fs.XAttrSetFlag; @@ -157,10 +158,8 @@ import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; -import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.ServiceFailedException; -import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HAUtil; @@ -168,6 +167,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.UnknownCryptoProtocolVersionException; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CachePoolEntry; @@ -177,14 +177,17 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.EncryptionZone; +import org.apache.hadoop.hdfs.protocol.ErasureCodingZone; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.protocol.RollingUpgradeException; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; @@ -200,6 +203,9 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; @@ -251,6 +257,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.RetryCache; import org.apache.hadoop.ipc.Server; @@ -414,6 +421,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, private final BlockManager blockManager; private final SnapshotManager snapshotManager; private final CacheManager cacheManager; + private final ErasureCodingSchemaManager ecSchemaManager; private final DatanodeStatistics datanodeStatistics; private String nameserviceId; @@ -590,6 +598,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, leaseManager.removeAllLeases(); snapshotManager.clearSnapshottableDirs(); cacheManager.clear(); + ecSchemaManager.clear(); setImageLoaded(false); blockManager.clear(); } @@ -827,6 +836,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, this.dir = new FSDirectory(this, conf); this.snapshotManager = new SnapshotManager(dir); this.cacheManager = new CacheManager(this, conf, blockManager); + this.ecSchemaManager = new ErasureCodingSchemaManager(); this.safeMode = new SafeModeInfo(conf); this.topConf = new TopConf(conf); this.auditLoggers = initAuditLoggers(conf); @@ -1755,8 +1765,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, LocatedBlocks blocks = res.blocks; if (blocks != null) { + List blkList = blocks.getLocatedBlocks(); + if (blkList == null || blkList.size() == 0 || + blkList.get(0) instanceof LocatedStripedBlock) { + // no need to sort locations for striped blocks + return blocks; + } blockManager.getDatanodeManager().sortLocatedBlocks( - clientMachine, blocks.getLocatedBlocks()); + clientMachine, blkList); // lastBlock is not part of getLocatedBlocks(), might need to sort it too LocatedBlock lastBlock = blocks.getLastLocatedBlock(); @@ -2114,7 +2130,19 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, if (!DFSUtil.isValidName(src)) { throw new InvalidPathException(src); } - blockManager.verifyReplication(src, replication, clientMachine); + + checkOperation(OperationCategory.READ); + readLock(); + try { + checkOperation(OperationCategory.READ); + if (!FSDirErasureCodingOp.isInErasureCodingZone(this, src)) { + blockManager.verifyReplication(src, replication, clientMachine); + } + } finally { + readUnlock(); + } + + checkOperation(OperationCategory.WRITE); if (blockSize < minBlockSize) { throw new IOException("Specified block size is less than configured" + " minimum value (" + DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY @@ -2455,6 +2483,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, final long preferredblocksize; final byte storagePolicyID; final List chosen; + final boolean isStriped; checkOperation(OperationCategory.READ); byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); FSPermissionChecker pc = getPermissionChecker(); @@ -2481,6 +2510,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, clientnode = blockManager.getDatanodeManager().getDatanodeByHost(clientMachine); preferredblocksize = file.getPreferredBlockSize(); storagePolicyID = file.getStoragePolicyID(); + isStriped = file.isStriped(); //find datanode storages final DatanodeManager dm = blockManager.getDatanodeManager(); @@ -2496,7 +2526,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, // choose new datanodes. final DatanodeStorageInfo[] targets = blockManager.chooseTarget4AdditionalDatanode( src, numAdditionalNodes, clientnode, chosen, - excludes, preferredblocksize, storagePolicyID); + excludes, preferredblocksize, storagePolicyID, isStriped); final LocatedBlock lb = BlockManager.newLocatedBlock( blk, targets, -1, false); blockManager.setBlockToken(lb, BlockTokenIdentifier.AccessMode.COPY); @@ -2590,15 +2620,20 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, writeUnlock(); } getEditLog().logSync(); + if (success) { + NameNode.stateChangeLog.info("DIR* completeFile: " + src + + " is closed by " + holder); + } return success; } /** * Create new block with a unique block id and a new generation stamp. + * @param isStriped is the file under striping or contiguous layout? */ - Block createNewBlock() throws IOException { + Block createNewBlock(boolean isStriped) throws IOException { assert hasWriteLock(); - Block b = new Block(nextBlockId(), 0, 0); + Block b = new Block(nextBlockId(isStriped), 0, 0); // Increment the generation stamp for every new block. b.setGenerationStamp(nextGenerationStamp(false)); return b; @@ -2792,7 +2827,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, if (trackBlockCounts) { if (b.isComplete()) { numRemovedComplete++; - if (blockManager.checkMinReplication(b)) { + if (blockManager.hasMinStorage(b, b.numNodes())) { numRemovedSafe++; } } @@ -3024,7 +3059,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, curBlock = blocks[nrCompleteBlocks]; if(!curBlock.isComplete()) break; - assert blockManager.checkMinReplication(curBlock) : + assert blockManager.hasMinStorage(curBlock) : "A COMPLETE block is not minimally replicated in " + src; } @@ -3059,8 +3094,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, BlockInfo penultimateBlock = pendingFile.getPenultimateBlock(); // If penultimate block doesn't exist then its minReplication is met - boolean penultimateBlockMinReplication = penultimateBlock == null ? true : - blockManager.checkMinReplication(penultimateBlock); + boolean penultimateBlockMinStorage = penultimateBlock == null || + blockManager.hasMinStorage(penultimateBlock); switch(lastBlockState) { case COMPLETE: @@ -3068,8 +3103,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, break; case COMMITTED: // Close file if committed blocks are minimally replicated - if(penultimateBlockMinReplication && - blockManager.checkMinReplication(lastBlock)) { + if(penultimateBlockMinStorage && + blockManager.hasMinStorage(lastBlock)) { finalizeINodeFileUnderConstruction(src, pendingFile, iip.getLatestSnapshotId()); NameNode.stateChangeLog.warn("BLOCK*" @@ -3089,16 +3124,19 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, throw new AlreadyBeingCreatedException(message); case UNDER_CONSTRUCTION: case UNDER_RECOVERY: - final BlockInfoContiguousUnderConstruction uc = (BlockInfoContiguousUnderConstruction)lastBlock; + // TODO support truncate of striped blocks + final BlockInfoUnderConstruction uc = + (BlockInfoUnderConstruction)lastBlock; // determine if last block was intended to be truncated Block recoveryBlock = uc.getTruncateBlock(); boolean truncateRecovery = recoveryBlock != null; boolean copyOnTruncate = truncateRecovery && - recoveryBlock.getBlockId() != uc.getBlockId(); + recoveryBlock.getBlockId() != uc.toBlock().getBlockId(); assert !copyOnTruncate || - recoveryBlock.getBlockId() < uc.getBlockId() && - recoveryBlock.getGenerationStamp() < uc.getGenerationStamp() && - recoveryBlock.getNumBytes() > uc.getNumBytes() : + recoveryBlock.getBlockId() < uc.toBlock().getBlockId() && + recoveryBlock.getGenerationStamp() < uc.toBlock(). + getGenerationStamp() && + recoveryBlock.getNumBytes() > uc.toBlock().getNumBytes() : "wrong recoveryBlock"; // setup the last block locations from the blockManager if not known @@ -3106,7 +3144,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, uc.setExpectedLocations(blockManager.getStorages(lastBlock)); } - if (uc.getNumExpectedLocations() == 0 && uc.getNumBytes() == 0) { + if (uc.getNumExpectedLocations() == 0 && + uc.toBlock().getNumBytes() == 0) { // There is no datanode reported to this block. // may be client have crashed before writing data to pipeline. // This blocks doesn't need any recovery. @@ -3119,10 +3158,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, return true; } // start recovery of the last block for this file - long blockRecoveryId = nextGenerationStamp(blockIdManager.isLegacyBlock(uc)); + long blockRecoveryId = + nextGenerationStamp(blockIdManager.isLegacyBlock(uc.toBlock())); lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile); if(copyOnTruncate) { - uc.setGenerationStamp(blockRecoveryId); + uc.toBlock().setGenerationStamp(blockRecoveryId); } else if(truncateRecovery) { recoveryBlock.setGenerationStamp(blockRecoveryId); } @@ -3167,10 +3207,32 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } // Adjust disk space consumption if required - final long diff = fileINode.getPreferredBlockSize() - commitBlock.getNumBytes(); + final long diff; + final short replicationFactor; + if (fileINode.isStriped()) { + final ErasureCodingZone ecZone = FSDirErasureCodingOp + .getErasureCodingZone(this, iip); + final ECSchema ecSchema = ecZone.getSchema(); + final short numDataUnits = (short) ecSchema.getNumDataUnits(); + final short numParityUnits = (short) ecSchema.getNumParityUnits(); + + final long numBlocks = numDataUnits + numParityUnits; + final long fullBlockGroupSize = + fileINode.getPreferredBlockSize() * numBlocks; + + final BlockInfoStriped striped = new BlockInfoStriped(commitBlock, + ecSchema, ecZone.getCellSize()); + final long actualBlockGroupSize = striped.spaceConsumed(); + + diff = fullBlockGroupSize - actualBlockGroupSize; + replicationFactor = (short) 1; + } else { + diff = fileINode.getPreferredBlockSize() - commitBlock.getNumBytes(); + replicationFactor = fileINode.getFileReplication(); + } if (diff > 0) { try { - dir.updateSpaceConsumed(iip, 0, -diff, fileINode.getFileReplication()); + dir.updateSpaceConsumed(iip, 0, -diff, replicationFactor); } catch (IOException e) { LOG.warn("Unexpected exception while updating disk space.", e); } @@ -3200,14 +3262,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } @VisibleForTesting - BlockInfo getStoredBlock(Block block) { + public BlockInfo getStoredBlock(Block block) { return blockManager.getStoredBlock(block); } @Override - public boolean isInSnapshot(BlockInfoContiguousUnderConstruction blockUC) { + public boolean isInSnapshot(BlockCollection bc) { assert hasReadLock(); - final BlockCollection bc = blockUC.getBlockCollection(); if (bc == null || !(bc instanceof INodeFile) || !bc.isUnderConstruction()) { return false; @@ -3252,7 +3313,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, waitForLoadingFSImage(); writeLock(); boolean copyTruncate = false; - BlockInfoContiguousUnderConstruction truncatedBlock = null; + BlockInfo truncatedBlock = null; try { checkOperation(OperationCategory.WRITE); // If a DN tries to commit to the standby, the recovery will @@ -3309,9 +3370,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, return; } - truncatedBlock = (BlockInfoContiguousUnderConstruction) iFile - .getLastBlock(); - long recoveryId = truncatedBlock.getBlockRecoveryId(); + truncatedBlock = iFile.getLastBlock(); + final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)truncatedBlock; + final long recoveryId = uc.getBlockRecoveryId(); copyTruncate = truncatedBlock.getBlockId() != storedBlock.getBlockId(); if(recoveryId != newgenerationstamp) { throw new IOException("The recovery id " + newgenerationstamp @@ -3325,8 +3386,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, if (remove) { blockManager.removeBlock(storedBlock); } - } - else { + } else { // update last block if(!copyTruncate) { storedBlock.setGenerationStamp(newgenerationstamp); @@ -3360,9 +3420,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, trimmedTargets.get(i).getStorageInfo(trimmedStorages.get(i)); if (storageInfo != null) { if(copyTruncate) { - storageInfo.addBlock(truncatedBlock); + storageInfo.addBlock(truncatedBlock, truncatedBlock); } else { - storageInfo.addBlock(storedBlock); + storageInfo.addBlock(storedBlock, storedBlock); } } } @@ -3374,12 +3434,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, trimmedTargets.toArray(new DatanodeID[trimmedTargets.size()]), trimmedStorages.toArray(new String[trimmedStorages.size()])); if(copyTruncate) { - iFile.setLastBlock(truncatedBlock, trimmedStorageInfos); + iFile.convertLastBlockToUC(truncatedBlock, trimmedStorageInfos); } else { - iFile.setLastBlock(storedBlock, trimmedStorageInfos); + iFile.convertLastBlockToUC(storedBlock, trimmedStorageInfos); if (closeFile) { - blockManager.markBlockReplicasAsCorrupt(storedBlock, - oldGenerationStamp, oldNumBytes, trimmedStorageInfos); + blockManager.markBlockReplicasAsCorrupt(oldBlock.getLocalBlock(), + storedBlock, oldGenerationStamp, oldNumBytes, + trimmedStorageInfos); } } } @@ -3387,7 +3448,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, if (closeFile) { if(copyTruncate) { src = closeFileCommitBlocks(iFile, truncatedBlock); - if(!iFile.isBlockInLatestSnapshot(storedBlock)) { + if(!iFile.isBlockInLatestSnapshot((BlockInfoContiguous) storedBlock)) { blockManager.removeBlock(storedBlock); } } else { @@ -3714,7 +3775,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, while (it.hasNext()) { Block b = it.next(); - BlockInfo blockInfo = blockManager.getStoredBlock(b); + BlockInfo blockInfo = getStoredBlock(b); if (blockInfo.getBlockCollection().getStoragePolicyID() == lpPolicy.getId()) { filesToDelete.add(blockInfo.getBlockCollection()); @@ -4353,10 +4414,16 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, /** * Increment number of safe blocks if current block has * reached minimal replication. - * @param replication current replication + * @param storageNum current number of replicas or number of internal blocks + * of a striped block group + * @param storedBlock current storedBlock which is either a + * BlockInfoContiguous or a BlockInfoStriped */ - private synchronized void incrementSafeBlockCount(short replication) { - if (replication == safeReplication) { + private synchronized void incrementSafeBlockCount(short storageNum, + BlockInfo storedBlock) { + final int safe = storedBlock.isStriped() ? + ((BlockInfoStriped) storedBlock).getRealDataBlockNum() : safeReplication; + if (storageNum == safe) { this.blockSafe++; // Report startup progress only if we haven't completed startup yet. @@ -4649,12 +4716,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } @Override - public void incrementSafeBlockCount(int replication) { + public void incrementSafeBlockCount(int storageNum, BlockInfo storedBlock) { // safeMode is volatile, and may be set to null at any time SafeModeInfo safeMode = this.safeMode; if (safeMode == null) return; - safeMode.incrementSafeBlockCount((short)replication); + safeMode.incrementSafeBlockCount((short) storageNum, storedBlock); } @Override @@ -5154,11 +5221,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, /** * Increments, logs and then returns the block ID + * @param isStriped is the file under striping or contiguous layout? */ - private long nextBlockId() throws IOException { + private long nextBlockId(boolean isStriped) throws IOException { assert hasWriteLock(); checkNameNodeSafeMode("Cannot get next block ID"); - final long blockId = blockIdManager.nextBlockId(); + final long blockId = isStriped ? + blockIdManager.nextStripedBlockId() : blockIdManager.nextContiguousBlockId(); getEditLog().logAllocateBlockId(blockId); // NB: callers sync the log return blockId; @@ -5262,29 +5331,32 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, * Get a new generation stamp together with an access token for * a block under construction * - * This method is called for recovering a failed pipeline or setting up - * a pipeline to append to a block. + * This method is called for recovering a failed write or setting up + * a block for appended. * * @param block a block * @param clientName the name of a client * @return a located block with a new generation stamp and an access token * @throws IOException if any error occurs */ - LocatedBlock updateBlockForPipeline(ExtendedBlock block, + LocatedBlock bumpBlockGenerationStamp(ExtendedBlock block, String clientName) throws IOException { - LocatedBlock locatedBlock; + final LocatedBlock locatedBlock; checkOperation(OperationCategory.WRITE); writeLock(); try { checkOperation(OperationCategory.WRITE); // check vadility of parameters - checkUCBlock(block, clientName); + final INodeFile file = checkUCBlock(block, clientName); // get a new generation stamp and an access token block.setGenerationStamp(nextGenerationStamp(blockIdManager.isLegacyBlock(block.getLocalBlock()))); - locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]); - blockManager.setBlockToken(locatedBlock, BlockTokenIdentifier.AccessMode.WRITE); + + locatedBlock = BlockManager.newLocatedBlock( + block, file.getLastBlock(), null, -1); + blockManager.setBlockToken(locatedBlock, + BlockTokenIdentifier.AccessMode.WRITE); } finally { writeUnlock(); } @@ -5336,21 +5408,27 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, assert hasWriteLock(); // check the vadility of the block and lease holder name final INodeFile pendingFile = checkUCBlock(oldBlock, clientName); - final BlockInfoContiguousUnderConstruction blockinfo - = (BlockInfoContiguousUnderConstruction)pendingFile.getLastBlock(); + final BlockInfo lastBlock = pendingFile.getLastBlock(); + final BlockInfoUnderConstruction blockinfo = (BlockInfoUnderConstruction)lastBlock; // check new GS & length: this is not expected - if (newBlock.getGenerationStamp() <= blockinfo.getGenerationStamp() || - newBlock.getNumBytes() < blockinfo.getNumBytes()) { - String msg = "Update " + oldBlock + " (len = " + - blockinfo.getNumBytes() + ") to an older state: " + newBlock + - " (len = " + newBlock.getNumBytes() +")"; + if (newBlock.getGenerationStamp() <= lastBlock.getGenerationStamp()) { + final String msg = "Update " + oldBlock + " but the new block " + newBlock + + " does not have a larger generation stamp than the last block " + + lastBlock; + LOG.warn(msg); + throw new IOException(msg); + } + if (newBlock.getNumBytes() < lastBlock.getNumBytes()) { + final String msg = "Update " + oldBlock + " (size=" + + oldBlock.getNumBytes() + ") to a smaller size block " + newBlock + + " (size=" + newBlock.getNumBytes() + ")"; LOG.warn(msg); throw new IOException(msg); } // Update old block with the new generation stamp and new length - blockinfo.setNumBytes(newBlock.getNumBytes()); + lastBlock.setNumBytes(newBlock.getNumBytes()); blockinfo.setGenerationStampAndVerifyReplicas(newBlock.getGenerationStamp()); // find the DatanodeDescriptor objects @@ -6122,17 +6200,29 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, public FSDirectory getFSDirectory() { return dir; } + /** Set the FSDirectory. */ @VisibleForTesting public void setFSDirectory(FSDirectory dir) { this.dir = dir; } + /** @return the cache manager. */ @Override public CacheManager getCacheManager() { return cacheManager; } + /** @return the ErasureCodingSchemaManager. */ + public ErasureCodingSchemaManager getErasureCodingSchemaManager() { + return ecSchemaManager; + } + + /** @return the ErasureCodingZoneManager. */ + public ErasureCodingZoneManager getErasureCodingZoneManager() { + return dir.ecZoneManager; + } + @Override // NameNodeMXBean public String getCorruptFiles() { List list = new ArrayList(); @@ -7078,6 +7168,85 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } } + /** + * Create an erasure coding zone on directory src. + * @param srcArg the path of a directory which will be the root of the + * erasure coding zone. The directory must be empty. + * @param schema ECSchema for the erasure coding zone + * @param cellSize Cell size of stripe + * @throws AccessControlException if the caller is not the superuser. + * @throws UnresolvedLinkException if the path can't be resolved. + * @throws SafeModeException if the Namenode is in safe mode. + */ + void createErasureCodingZone(final String srcArg, final ECSchema schema, + int cellSize, final boolean logRetryCache) throws IOException, + UnresolvedLinkException, SafeModeException, AccessControlException { + checkSuperuserPrivilege(); + checkOperation(OperationCategory.WRITE); + HdfsFileStatus resultingStat = null; + boolean success = false; + writeLock(); + try { + checkOperation(OperationCategory.WRITE); + checkNameNodeSafeMode("Cannot create erasure coding zone on " + srcArg); + resultingStat = FSDirErasureCodingOp.createErasureCodingZone(this, + srcArg, schema, cellSize, logRetryCache); + success = true; + } finally { + writeUnlock(); + if (success) { + getEditLog().logSync(); + } + logAuditEvent(success, "createErasureCodingZone", srcArg, null, + resultingStat); + } + } + + /** + * Get the erasure coding zone information for specified path + */ + ErasureCodingZone getErasureCodingZone(String src) + throws AccessControlException, UnresolvedLinkException, IOException { + checkOperation(OperationCategory.READ); + readLock(); + try { + checkOperation(OperationCategory.READ); + return getErasureCodingZoneForPath(src); + } finally { + readUnlock(); + } + } + + /** + * Get available erasure coding schemas + */ + ECSchema[] getErasureCodingSchemas() throws IOException { + checkOperation(OperationCategory.READ); + waitForLoadingFSImage(); + readLock(); + try { + checkOperation(OperationCategory.READ); + return FSDirErasureCodingOp.getErasureCodingSchemas(this); + } finally { + readUnlock(); + } + } + + /** + * Get the ECSchema specified by the name + */ + ECSchema getErasureCodingSchema(String schemaName) throws IOException { + checkOperation(OperationCategory.READ); + waitForLoadingFSImage(); + readLock(); + try { + checkOperation(OperationCategory.READ); + return FSDirErasureCodingOp.getErasureCodingSchema(this, schemaName); + } finally { + readUnlock(); + } + } + void setXAttr(String src, XAttr xAttr, EnumSet flag, boolean logRetryCache) throws IOException { @@ -7260,5 +7429,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } } + @Override + public ErasureCodingZone getErasureCodingZoneForPath(String src) + throws IOException { + return FSDirErasureCodingOp.getErasureCodingZone(this, src); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java index d07ae1f..900839a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java @@ -21,7 +21,6 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; /** @@ -61,9 +60,9 @@ public class FileUnderConstructionFeature implements INode.Feature { BlockInfo lastBlock = f.getLastBlock(); assert (lastBlock != null) : "The last block for path " + f.getFullPathName() + " is null when updating its length"; - assert (lastBlock instanceof BlockInfoContiguousUnderConstruction) + assert !lastBlock.isComplete() : "The last block for path " + f.getFullPathName() - + " is not a BlockInfoUnderConstruction when updating its length"; + + " is not a BlockInfoUnderConstruction when updating its length"; lastBlock.setNumBytes(lastBlockLength); } @@ -76,9 +75,8 @@ public class FileUnderConstructionFeature implements INode.Feature { final BlocksMapUpdateInfo collectedBlocks) { final BlockInfo[] blocks = f.getBlocks(); if (blocks != null && blocks.length > 0 - && blocks[blocks.length - 1] instanceof BlockInfoContiguousUnderConstruction) { - BlockInfoContiguousUnderConstruction lastUC = - (BlockInfoContiguousUnderConstruction) blocks[blocks.length - 1]; + && !blocks[blocks.length - 1].isComplete()) { + BlockInfo lastUC = blocks[blocks.length - 1]; if (lastUC.getNumBytes() == 0) { // this is a 0-sized block. do not need check its UC state here collectedBlocks.addDeleteBlock(lastUC);