Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 929F61873A for ; Wed, 22 Jul 2015 00:28:48 +0000 (UTC) Received: (qmail 2914 invoked by uid 500); 22 Jul 2015 00:28:42 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 2850 invoked by uid 500); 22 Jul 2015 00:28:42 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 2841 invoked by uid 99); 22 Jul 2015 00:28:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Jul 2015 00:28:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0AAEDE07F6; Wed, 22 Jul 2015 00:28:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wheat9@apache.org To: common-commits@hadoop.apache.org Message-Id: <169ae3660e5e40aca79f4fd1e3164786@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: HDFS-8495. Consolidate append() related implementation into a single class. Contributed by Rakesh R. Date: Wed, 22 Jul 2015 00:28:42 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/trunk 393fe7177 -> 31f117138 HDFS-8495. Consolidate append() related implementation into a single class. Contributed by Rakesh R. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/31f11713 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/31f11713 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/31f11713 Branch: refs/heads/trunk Commit: 31f117138a00794de4951ee8433e304d72b04094 Parents: 393fe71 Author: Haohui Mai Authored: Tue Jul 21 17:25:23 2015 -0700 Committer: Haohui Mai Committed: Tue Jul 21 17:25:23 2015 -0700 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hdfs/server/namenode/FSDirAppendOp.java | 261 +++++++++++++++++++ .../server/namenode/FSDirStatAndListingOp.java | 2 +- .../hdfs/server/namenode/FSDirTruncateOp.java | 16 +- .../hdfs/server/namenode/FSDirWriteFileOp.java | 6 +- .../hdfs/server/namenode/FSEditLogLoader.java | 4 +- .../hdfs/server/namenode/FSNamesystem.java | 241 ++--------------- 7 files changed, 304 insertions(+), 229 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/31f11713/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 8122045..50803de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -737,6 +737,9 @@ Release 2.8.0 - UNRELEASED HDFS-8721. Add a metric for number of encryption zones. (Rakesh R via cnauroth) + HDFS-8495. Consolidate append() related implementation into a single class. + (Rakesh R via wheat9) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than http://git-wip-us.apache.org/repos/asf/hadoop/blob/31f11713/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java new file mode 100644 index 0000000..abb2dc8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.QuotaExceededException; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp; +import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion.Feature; + +import com.google.common.base.Preconditions; + +/** + * Helper class to perform append operation. + */ +final class FSDirAppendOp { + + /** + * Private constructor for preventing FSDirAppendOp object creation. + * Static-only class. + */ + private FSDirAppendOp() {} + + /** + * Append to an existing file. + *

+ * + * The method returns the last block of the file if this is a partial block, + * which can still be used for writing more data. The client uses the + * returned block locations to form the data pipeline for this block.
+ * The {@link LocatedBlock} will be null if the last block is full. + * The client then allocates a new block with the next call using + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#addBlock}. + *

+ * + * For description of parameters and exceptions thrown see + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#append} + * + * @param fsn namespace + * @param srcArg path name + * @param pc permission checker to check fs permission + * @param holder client name + * @param clientMachine client machine info + * @param newBlock if the data is appended to a new block + * @param logRetryCache whether to record RPC ids in editlog for retry cache + * rebuilding + * @return the last block with status + */ + static LastBlockWithStatus appendFile(final FSNamesystem fsn, + final String srcArg, final FSPermissionChecker pc, final String holder, + final String clientMachine, final boolean newBlock, + final boolean logRetryCache) throws IOException { + assert fsn.hasWriteLock(); + + final byte[][] pathComponents = FSDirectory + .getPathComponentsForReservedPath(srcArg); + final LocatedBlock lb; + final FSDirectory fsd = fsn.getFSDirectory(); + final String src; + fsd.writeLock(); + try { + src = fsd.resolvePath(pc, srcArg, pathComponents); + final INodesInPath iip = fsd.getINodesInPath4Write(src); + // Verify that the destination does not exist as a directory already + final INode inode = iip.getLastINode(); + final String path = iip.getPath(); + if (inode != null && inode.isDirectory()) { + throw new FileAlreadyExistsException("Cannot append to directory " + + path + "; already exists as a directory."); + } + if (fsd.isPermissionEnabled()) { + fsd.checkPathAccess(pc, iip, FsAction.WRITE); + } + + if (inode == null) { + throw new FileNotFoundException( + "Failed to append to non-existent file " + path + " for client " + + clientMachine); + } + final INodeFile file = INodeFile.valueOf(inode, path, true); + BlockManager blockManager = fsd.getBlockManager(); + final BlockStoragePolicy lpPolicy = blockManager + .getStoragePolicy("LAZY_PERSIST"); + if (lpPolicy != null && lpPolicy.getId() == file.getStoragePolicyID()) { + throw new UnsupportedOperationException( + "Cannot append to lazy persist file " + path); + } + // Opening an existing file for append - may need to recover lease. + fsn.recoverLeaseInternal(RecoverLeaseOp.APPEND_FILE, iip, path, holder, + clientMachine, false); + + final BlockInfo lastBlock = file.getLastBlock(); + // Check that the block has at least minimum replication. + if (lastBlock != null && lastBlock.isComplete() + && !blockManager.isSufficientlyReplicated(lastBlock)) { + throw new IOException("append: lastBlock=" + lastBlock + " of src=" + + path + " is not sufficiently replicated yet."); + } + lb = prepareFileForAppend(fsn, iip, holder, clientMachine, newBlock, + true, logRetryCache); + } catch (IOException ie) { + NameNode.stateChangeLog + .warn("DIR* NameSystem.append: " + ie.getMessage()); + throw ie; + } finally { + fsd.writeUnlock(); + } + + HdfsFileStatus stat = FSDirStatAndListingOp.getFileInfo(fsd, src, false, + FSDirectory.isReservedRawName(srcArg), true); + if (lb != null) { + NameNode.stateChangeLog.debug( + "DIR* NameSystem.appendFile: file {} for {} at {} block {} block" + + " size {}", srcArg, holder, clientMachine, lb.getBlock(), lb + .getBlock().getNumBytes()); + } + return new LastBlockWithStatus(lb, stat); + } + + /** + * Convert current node to under construction. + * Recreate in-memory lease record. + * + * @param fsn namespace + * @param iip inodes in the path containing the file + * @param leaseHolder identifier of the lease holder on this file + * @param clientMachine identifier of the client machine + * @param newBlock if the data is appended to a new block + * @param writeToEditLog whether to persist this change to the edit log + * @param logRetryCache whether to record RPC ids in editlog for retry cache + * rebuilding + * @return the last block locations if the block is partial or null otherwise + * @throws IOException + */ + static LocatedBlock prepareFileForAppend(final FSNamesystem fsn, + final INodesInPath iip, final String leaseHolder, + final String clientMachine, final boolean newBlock, + final boolean writeToEditLog, final boolean logRetryCache) + throws IOException { + assert fsn.hasWriteLock(); + + final INodeFile file = iip.getLastINode().asFile(); + final QuotaCounts delta = verifyQuotaForUCBlock(fsn, file, iip); + + file.recordModification(iip.getLatestSnapshotId()); + file.toUnderConstruction(leaseHolder, clientMachine); + + fsn.getLeaseManager().addLease( + file.getFileUnderConstructionFeature().getClientName(), file.getId()); + + LocatedBlock ret = null; + if (!newBlock) { + FSDirectory fsd = fsn.getFSDirectory(); + ret = fsd.getBlockManager().convertLastBlockToUnderConstruction(file, 0); + if (ret != null && delta != null) { + Preconditions.checkState(delta.getStorageSpace() >= 0, "appending to" + + " a block with size larger than the preferred block size"); + fsd.writeLock(); + try { + fsd.updateCountNoQuotaCheck(iip, iip.length() - 1, delta); + } finally { + fsd.writeUnlock(); + } + } + } else { + BlockInfo lastBlock = file.getLastBlock(); + if (lastBlock != null) { + ExtendedBlock blk = new ExtendedBlock(fsn.getBlockPoolId(), lastBlock); + ret = new LocatedBlock(blk, new DatanodeInfo[0]); + } + } + + if (writeToEditLog) { + final String path = iip.getPath(); + if (NameNodeLayoutVersion.supports(Feature.APPEND_NEW_BLOCK, + fsn.getEffectiveLayoutVersion())) { + fsn.getEditLog().logAppendFile(path, file, newBlock, logRetryCache); + } else { + fsn.getEditLog().logOpenFile(path, file, false, logRetryCache); + } + } + return ret; + } + + /** + * Verify quota when using the preferred block size for UC block. This is + * usually used by append and truncate. + * + * @throws QuotaExceededException when violating the storage quota + * @return expected quota usage update. null means no change or no need to + * update quota usage later + */ + private static QuotaCounts verifyQuotaForUCBlock(FSNamesystem fsn, + INodeFile file, INodesInPath iip) throws QuotaExceededException { + FSDirectory fsd = fsn.getFSDirectory(); + if (!fsn.isImageLoaded() || fsd.shouldSkipQuotaChecks()) { + // Do not check quota if editlog is still being processed + return null; + } + if (file.getLastBlock() != null) { + final QuotaCounts delta = computeQuotaDeltaForUCBlock(fsn, file); + fsd.readLock(); + try { + FSDirectory.verifyQuota(iip, iip.length() - 1, delta, null); + return delta; + } finally { + fsd.readUnlock(); + } + } + return null; + } + + /** Compute quota change for converting a complete block to a UC block. */ + private static QuotaCounts computeQuotaDeltaForUCBlock(FSNamesystem fsn, + INodeFile file) { + final QuotaCounts delta = new QuotaCounts.Builder().build(); + final BlockInfo lastBlock = file.getLastBlock(); + if (lastBlock != null) { + final long diff = file.getPreferredBlockSize() - lastBlock.getNumBytes(); + final short repl = file.getPreferredBlockReplication(); + delta.addStorageSpace(diff * repl); + final BlockStoragePolicy policy = fsn.getFSDirectory() + .getBlockStoragePolicySuite().getPolicy(file.getStoragePolicyID()); + List types = policy.chooseStorageTypes(repl); + for (StorageType t : types) { + if (t.supportTypeQuota()) { + delta.addTypeSpace(t, diff); + } + } + } + return delta; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31f11713/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java index 201dabc..14f4d66 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java @@ -508,7 +508,7 @@ class FSDirStatAndListingOp { final long fileSize = !inSnapshot && isUc ? fileNode.computeFileSizeNotIncludingLastUcBlock() : size; - loc = fsd.getFSNamesystem().getBlockManager().createLocatedBlocks( + loc = fsd.getBlockManager().createLocatedBlocks( fileNode.getBlocks(snapshot), fileSize, isUc, 0L, size, false, inSnapshot, feInfo); if (loc == null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/31f11713/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java index 9fc9def..e24bb2f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstructionContiguous; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp; import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; @@ -79,11 +80,11 @@ final class FSDirTruncateOp { try { src = fsd.resolvePath(pc, srcArg, pathComponents); iip = fsd.getINodesInPath4Write(src, true); - if (fsn.isPermissionEnabled()) { + if (fsd.isPermissionEnabled()) { fsd.checkPathAccess(pc, iip, FsAction.WRITE); } INodeFile file = INodeFile.valueOf(iip.getLastINode(), src); - final BlockStoragePolicy lpPolicy = fsn.getBlockManager() + final BlockStoragePolicy lpPolicy = fsd.getBlockManager() .getStoragePolicy("LAZY_PERSIST"); if (lpPolicy != null && lpPolicy.getId() == file.getStoragePolicyID()) { @@ -178,7 +179,7 @@ final class FSDirTruncateOp { "Should be the same block."; if (oldBlock.getBlockId() != tBlk.getBlockId() && !file.isBlockInLatestSnapshot(oldBlock)) { - fsn.getBlockManager().removeBlockFromMap(oldBlock); + fsd.getBlockManager().removeBlockFromMap(oldBlock); } } assert onBlockBoundary == (truncateBlock == null) : @@ -223,6 +224,7 @@ final class FSDirTruncateOp { } BlockInfoUnderConstruction truncatedBlockUC; + BlockManager blockManager = fsn.getFSDirectory().getBlockManager(); if (shouldCopyOnTruncate) { // Add new truncateBlock into blocksMap and // use oldBlock as a source for copy-on-truncate recovery @@ -230,9 +232,8 @@ final class FSDirTruncateOp { file.getPreferredBlockReplication()); truncatedBlockUC.setNumBytes(oldBlock.getNumBytes() - lastBlockDelta); truncatedBlockUC.setTruncateBlock(oldBlock); - file.setLastBlock(truncatedBlockUC, - fsn.getBlockManager().getStorages(oldBlock)); - fsn.getBlockManager().addBlockCollection(truncatedBlockUC, file); + file.setLastBlock(truncatedBlockUC, blockManager.getStorages(oldBlock)); + blockManager.addBlockCollection(truncatedBlockUC, file); NameNode.stateChangeLog.debug( "BLOCK* prepareFileForTruncate: Scheduling copy-on-truncate to new" @@ -241,8 +242,7 @@ final class FSDirTruncateOp { truncatedBlockUC.getTruncateBlock()); } else { // Use new generation stamp for in-place truncate recovery - fsn.getBlockManager().convertLastBlockToUnderConstruction(file, - lastBlockDelta); + blockManager.convertLastBlockToUnderConstruction(file, lastBlockDelta); oldBlock = file.getLastBlock(); assert !oldBlock.isComplete() : "oldBlock should be under construction"; truncatedBlockUC = (BlockInfoUnderConstruction) oldBlock; http://git-wip-us.apache.org/repos/asf/hadoop/blob/31f11713/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java index 4830d5d..008a945 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java @@ -206,8 +206,8 @@ class FSDirWriteFileOp { DatanodeStorageInfo[] locs, long offset) throws IOException { LocatedBlock lBlk = BlockManager.newLocatedBlock(fsn.getExtendedBlock(blk), locs, offset, false); - fsn.getBlockManager().setBlockToken(lBlk, - BlockTokenIdentifier.AccessMode.WRITE); + fsn.getFSDirectory().getBlockManager() + .setBlockToken(lBlk, BlockTokenIdentifier.AccessMode.WRITE); return lBlk; } @@ -426,7 +426,7 @@ class FSDirWriteFileOp { fsd.setFileEncryptionInfo(src, feInfo); newNode = fsd.getInode(newNode.getId()).asFile(); } - setNewINodeStoragePolicy(fsn.getBlockManager(), newNode, iip, + setNewINodeStoragePolicy(fsd.getBlockManager(), newNode, iip, isLazyPersist); fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry); if (NameNode.stateChangeLog.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/31f11713/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index 63ef985..357684a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -392,7 +392,7 @@ public class FSEditLogLoader { FSNamesystem.LOG.debug("Reopening an already-closed file " + "for append"); } - LocatedBlock lb = fsNamesys.prepareFileForAppend(path, iip, + LocatedBlock lb = FSDirAppendOp.prepareFileForAppend(fsNamesys, iip, addCloseOp.clientName, addCloseOp.clientMachine, false, false, false); // add the op into retry cache if necessary @@ -466,7 +466,7 @@ public class FSEditLogLoader { INodesInPath iip = fsDir.getINodesInPath4Write(path); INodeFile file = INodeFile.valueOf(iip.getLastINode(), path); if (!file.isUnderConstruction()) { - LocatedBlock lb = fsNamesys.prepareFileForAppend(path, iip, + LocatedBlock lb = FSDirAppendOp.prepareFileForAppend(fsNamesys, iip, appendOp.clientName, appendOp.clientMachine, appendOp.newBlock, false, false); // add the op into retry cache if necessary http://git-wip-us.apache.org/repos/asf/hadoop/blob/31f11713/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index fd37fbe..0b44431 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -142,7 +142,6 @@ import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries; import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; -import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsServerDefaults; @@ -185,7 +184,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; -import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.protocol.RollingUpgradeException; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; @@ -250,7 +248,6 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; -import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RetriableException; @@ -2174,175 +2171,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } /** - * Append to an existing file for append. - *

- * - * The method returns the last block of the file if this is a partial block, - * which can still be used for writing more data. The client uses the returned - * block locations to form the data pipeline for this block.
- * The method returns null if the last block is full. The client then - * allocates a new block with the next call using - * {@link ClientProtocol#addBlock}. - *

- * - * For description of parameters and exceptions thrown see - * {@link ClientProtocol#append(String, String, EnumSetWritable)} - * - * @return the last block locations if the block is partial or null otherwise - */ - private LocatedBlock appendFileInternal(FSPermissionChecker pc, - INodesInPath iip, String holder, String clientMachine, boolean newBlock, - boolean logRetryCache) throws IOException { - assert hasWriteLock(); - // Verify that the destination does not exist as a directory already. - final INode inode = iip.getLastINode(); - final String src = iip.getPath(); - if (inode != null && inode.isDirectory()) { - throw new FileAlreadyExistsException("Cannot append to directory " + src - + "; already exists as a directory."); - } - if (isPermissionEnabled) { - dir.checkPathAccess(pc, iip, FsAction.WRITE); - } - - try { - if (inode == null) { - throw new FileNotFoundException("failed to append to non-existent file " - + src + " for client " + clientMachine); - } - INodeFile myFile = INodeFile.valueOf(inode, src, true); - final BlockStoragePolicy lpPolicy = - blockManager.getStoragePolicy("LAZY_PERSIST"); - if (lpPolicy != null && - lpPolicy.getId() == myFile.getStoragePolicyID()) { - throw new UnsupportedOperationException( - "Cannot append to lazy persist file " + src); - } - // Opening an existing file for append - may need to recover lease. - recoverLeaseInternal(RecoverLeaseOp.APPEND_FILE, iip, src, holder, - clientMachine, false); - - final BlockInfo lastBlock = myFile.getLastBlock(); - // Check that the block has at least minimum replication. - if(lastBlock != null && lastBlock.isComplete() && - !getBlockManager().isSufficientlyReplicated(lastBlock)) { - throw new IOException("append: lastBlock=" + lastBlock + - " of src=" + src + " is not sufficiently replicated yet."); - } - return prepareFileForAppend(src, iip, holder, clientMachine, newBlock, - true, logRetryCache); - } catch (IOException ie) { - NameNode.stateChangeLog.warn("DIR* NameSystem.append: " +ie.getMessage()); - throw ie; - } - } - - /** - * Convert current node to under construction. - * Recreate in-memory lease record. - * - * @param src path to the file - * @param leaseHolder identifier of the lease holder on this file - * @param clientMachine identifier of the client machine - * @param newBlock if the data is appended to a new block - * @param writeToEditLog whether to persist this change to the edit log - * @param logRetryCache whether to record RPC ids in editlog for retry cache - * rebuilding - * @return the last block locations if the block is partial or null otherwise - * @throws UnresolvedLinkException - * @throws IOException - */ - LocatedBlock prepareFileForAppend(String src, INodesInPath iip, - String leaseHolder, String clientMachine, boolean newBlock, - boolean writeToEditLog, boolean logRetryCache) throws IOException { - final INodeFile file = iip.getLastINode().asFile(); - final QuotaCounts delta = verifyQuotaForUCBlock(file, iip); - - file.recordModification(iip.getLatestSnapshotId()); - file.toUnderConstruction(leaseHolder, clientMachine); - - leaseManager.addLease( - file.getFileUnderConstructionFeature().getClientName(), file.getId()); - - LocatedBlock ret = null; - if (!newBlock) { - ret = blockManager.convertLastBlockToUnderConstruction(file, 0); - if (ret != null && delta != null) { - Preconditions.checkState(delta.getStorageSpace() >= 0, - "appending to a block with size larger than the preferred block size"); - dir.writeLock(); - try { - dir.updateCountNoQuotaCheck(iip, iip.length() - 1, delta); - } finally { - dir.writeUnlock(); - } - } - } else { - BlockInfo lastBlock = file.getLastBlock(); - if (lastBlock != null) { - ExtendedBlock blk = new ExtendedBlock(this.getBlockPoolId(), lastBlock); - ret = new LocatedBlock(blk, new DatanodeInfo[0]); - } - } - - if (writeToEditLog) { - if (NameNodeLayoutVersion.supports(Feature.APPEND_NEW_BLOCK, - getEffectiveLayoutVersion())) { - getEditLog().logAppendFile(src, file, newBlock, logRetryCache); - } else { - getEditLog().logOpenFile(src, file, false, logRetryCache); - } - } - return ret; - } - - /** - * Verify quota when using the preferred block size for UC block. This is - * usually used by append and truncate - * @throws QuotaExceededException when violating the storage quota - * @return expected quota usage update. null means no change or no need to - * update quota usage later - */ - private QuotaCounts verifyQuotaForUCBlock(INodeFile file, INodesInPath iip) - throws QuotaExceededException { - if (!isImageLoaded() || dir.shouldSkipQuotaChecks()) { - // Do not check quota if editlog is still being processed - return null; - } - if (file.getLastBlock() != null) { - final QuotaCounts delta = computeQuotaDeltaForUCBlock(file); - dir.readLock(); - try { - FSDirectory.verifyQuota(iip, iip.length() - 1, delta, null); - return delta; - } finally { - dir.readUnlock(); - } - } - return null; - } - - /** Compute quota change for converting a complete block to a UC block */ - private QuotaCounts computeQuotaDeltaForUCBlock(INodeFile file) { - final QuotaCounts delta = new QuotaCounts.Builder().build(); - final BlockInfo lastBlock = file.getLastBlock(); - if (lastBlock != null) { - final long diff = file.getPreferredBlockSize() - lastBlock.getNumBytes(); - final short repl = file.getPreferredBlockReplication(); - delta.addStorageSpace(diff * repl); - final BlockStoragePolicy policy = dir.getBlockStoragePolicySuite() - .getPolicy(file.getStoragePolicyID()); - List types = policy.chooseStorageTypes(repl); - for (StorageType t : types) { - if (t.supportTypeQuota()) { - delta.addTypeSpace(t, diff); - } - } - } - return delta; - } - - /** * Recover lease; * Immediately revoke the lease of the current lease holder and start lease * recovery so that the file can be forced to be closed. @@ -2487,62 +2315,45 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, /** * Append to an existing file in the namespace. */ - LastBlockWithStatus appendFile(String src, String holder, + LastBlockWithStatus appendFile(String srcArg, String holder, String clientMachine, EnumSet flag, boolean logRetryCache) throws IOException { boolean newBlock = flag.contains(CreateFlag.NEW_BLOCK); if (newBlock) { requireEffectiveLayoutVersionForFeature(Feature.APPEND_NEW_BLOCK); } - try { - return appendFileInt(src, holder, clientMachine, newBlock, logRetryCache); - } catch (AccessControlException e) { - logAuditEvent(false, "append", src); - throw e; - } - } - private LastBlockWithStatus appendFileInt(final String srcArg, String holder, - String clientMachine, boolean newBlock, boolean logRetryCache) - throws IOException { - String src = srcArg; NameNode.stateChangeLog.debug( "DIR* NameSystem.appendFile: src={}, holder={}, clientMachine={}", - src, holder, clientMachine); - boolean skipSync = false; - LocatedBlock lb = null; - HdfsFileStatus stat = null; - FSPermissionChecker pc = getPermissionChecker(); - byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); - writeLock(); + srcArg, holder, clientMachine); try { + boolean skipSync = false; + LastBlockWithStatus lbs = null; + final FSPermissionChecker pc = getPermissionChecker(); checkOperation(OperationCategory.WRITE); - checkNameNodeSafeMode("Cannot append to file" + src); - src = dir.resolvePath(pc, src, pathComponents); - final INodesInPath iip = dir.getINodesInPath4Write(src); - lb = appendFileInternal(pc, iip, holder, clientMachine, newBlock, - logRetryCache); - stat = FSDirStatAndListingOp.getFileInfo(dir, src, false, - FSDirectory.isReservedRawName(srcArg), true); - } catch (StandbyException se) { - skipSync = true; - throw se; - } finally { - writeUnlock(); - // There might be transactions logged while trying to recover the lease. - // They need to be sync'ed even when an exception was thrown. - if (!skipSync) { - getEditLog().logSync(); + writeLock(); + try { + checkOperation(OperationCategory.WRITE); + checkNameNodeSafeMode("Cannot append to file" + srcArg); + lbs = FSDirAppendOp.appendFile(this, srcArg, pc, holder, clientMachine, + newBlock, logRetryCache); + } catch (StandbyException se) { + skipSync = true; + throw se; + } finally { + writeUnlock(); + // There might be transactions logged while trying to recover the lease + // They need to be sync'ed even when an exception was thrown. + if (!skipSync) { + getEditLog().logSync(); + } } + logAuditEvent(true, "append", srcArg); + return lbs; + } catch (AccessControlException e) { + logAuditEvent(false, "append", srcArg); + throw e; } - if (lb != null) { - NameNode.stateChangeLog.debug( - "DIR* NameSystem.appendFile: file {} for {} at {} block {} block" + - " size {}", src, holder, clientMachine, lb.getBlock(), - lb.getBlock().getNumBytes()); - } - logAuditEvent(true, "append", srcArg); - return new LastBlockWithStatus(lb, stat); } ExtendedBlock getExtendedBlock(Block blk) {