hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiten...@apache.org
Subject [10/50] [abbrv] hadoop git commit: HDFS-8394. Move getAdditionalBlock() and related functionalities into a separate class. Contributed by Haohui Mai.
Date Wed, 20 May 2015 23:48:34 GMT
HDFS-8394. Move getAdditionalBlock() and related functionalities into a separate class. Contributed by Haohui Mai.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e5afac58
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e5afac58
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e5afac58

Branch: refs/heads/HDFS-7240
Commit: e5afac5896a1a88e152746598527d91f73cbb724
Parents: 8f37873
Author: Haohui Mai <wheat9@apache.org>
Authored: Fri May 15 19:09:59 2015 -0700
Committer: Haohui Mai <wheat9@apache.org>
Committed: Fri May 15 19:09:59 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hdfs/server/namenode/FSDirWriteFileOp.java  | 563 +++++++++++++++++++
 .../hdfs/server/namenode/FSDirectory.java       |  78 +--
 .../hdfs/server/namenode/FSEditLogLoader.java   |   3 +-
 .../hdfs/server/namenode/FSNamesystem.java      | 497 ++--------------
 .../hdfs/server/namenode/NameNodeRpcServer.java |  30 +-
 .../hdfs/server/namenode/TestAddBlockRetry.java |  30 +-
 .../TestCommitBlockSynchronization.java         |   3 +
 8 files changed, 648 insertions(+), 559 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5afac58/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 35e81f9..4a33987 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -557,6 +557,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-8397. Refactor the error handling code in DataStreamer.
     (Tsz Wo Nicholas Sze via jing9)
 
+    HDFS-8394. Move getAdditionalBlock() and related functionalities into a
+    separate class. (wheat9)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5afac58/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
new file mode 100644
index 0000000..1ff0899
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
@@ -0,0 +1,563 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+class FSDirWriteFileOp {
+  private FSDirWriteFileOp() {}
+  static boolean unprotectedRemoveBlock(
+      FSDirectory fsd, String path, INodesInPath iip, INodeFile fileNode,
+      Block block) throws IOException {
+    // modify file-> block and blocksMap
+    // fileNode should be under construction
+    BlockInfoContiguousUnderConstruction uc = fileNode.removeLastBlock(block);
+    if (uc == null) {
+      return false;
+    }
+    fsd.getBlockManager().removeBlockFromMap(block);
+
+    if(NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* FSDirectory.removeBlock: "
+          +path+" with "+block
+          +" block is removed from the file system");
+    }
+
+    // update space consumed
+    fsd.updateCount(iip, 0, -fileNode.getPreferredBlockSize(),
+                    fileNode.getPreferredBlockReplication(), true);
+    return true;
+  }
+
+  /**
+   * Persist the block list for the inode.
+   */
+  static void persistBlocks(
+      FSDirectory fsd, String path, INodeFile file, boolean logRetryCache) {
+    assert fsd.getFSNamesystem().hasWriteLock();
+    Preconditions.checkArgument(file.isUnderConstruction());
+    fsd.getEditLog().logUpdateBlocks(path, file, logRetryCache);
+    if(NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("persistBlocks: " + path
+              + " with " + file.getBlocks().length + " blocks is persisted to" +
+              " the file system");
+    }
+  }
+
+  static void abandonBlock(
+      FSDirectory fsd, FSPermissionChecker pc, ExtendedBlock b, long fileId,
+      String src, String holder) throws IOException {
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    src = fsd.resolvePath(pc, src, pathComponents);
+
+    final INode inode;
+    final INodesInPath iip;
+    if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
+      // Older clients may not have given us an inode ID to work with.
+      // In this case, we have to try to resolve the path and hope it
+      // hasn't changed or been deleted since the file was opened for write.
+      iip = fsd.getINodesInPath(src, true);
+      inode = iip.getLastINode();
+    } else {
+      inode = fsd.getInode(fileId);
+      iip = INodesInPath.fromINode(inode);
+      if (inode != null) {
+        src = iip.getPath();
+      }
+    }
+    FSNamesystem fsn = fsd.getFSNamesystem();
+    final INodeFile file = fsn.checkLease(src, holder, inode, fileId);
+    Preconditions.checkState(file.isUnderConstruction());
+
+    Block localBlock = ExtendedBlock.getLocalBlock(b);
+    fsd.writeLock();
+    try {
+      // Remove the block from the pending creates list
+      if (!unprotectedRemoveBlock(fsd, src, iip, file, localBlock)) {
+        return;
+      }
+    } finally {
+      fsd.writeUnlock();
+    }
+    persistBlocks(fsd, src, file, false);
+  }
+
+  static void checkBlock(FSNamesystem fsn, ExtendedBlock block)
+      throws IOException {
+    String bpId = fsn.getBlockPoolId();
+    if (block != null && !bpId.equals(block.getBlockPoolId())) {
+      throw new IOException("Unexpected BlockPoolId " + block.getBlockPoolId()
+          + " - expected " + bpId);
+    }
+  }
+
+  /**
+   * Part I of getAdditionalBlock().
+   * Analyze the state of the file under read lock to determine if the client
+   * can add a new block, detect potential retries, lease mismatches,
+   * and minimal replication of the penultimate block.
+   *
+   * Generate target DataNode locations for the new block,
+   * but do not create the new block yet.
+   */
+  static ValidateAddBlockResult validateAddBlock(
+      FSNamesystem fsn, FSPermissionChecker pc,
+      String src, long fileId, String clientName,
+      ExtendedBlock previous, LocatedBlock[] onRetryBlock) throws IOException {
+    final long blockSize;
+    final int replication;
+    final byte storagePolicyID;
+    String clientMachine;
+
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    src = fsn.dir.resolvePath(pc, src, pathComponents);
+    FileState fileState = analyzeFileState(fsn, src, fileId, clientName,
+                                           previous, onRetryBlock);
+    final INodeFile pendingFile = fileState.inode;
+    // Check if the penultimate block is minimally replicated
+    if (!fsn.checkFileProgress(src, pendingFile, false)) {
+      throw new NotReplicatedYetException("Not replicated yet: " + src);
+    }
+
+    if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) {
+      // This is a retry. No need to generate new locations.
+      // Use the last block if it has locations.
+      return null;
+    }
+    if (pendingFile.getBlocks().length >= fsn.maxBlocksPerFile) {
+      throw new IOException("File has reached the limit on maximum number of"
+          + " blocks (" + DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY
+          + "): " + pendingFile.getBlocks().length + " >= "
+          + fsn.maxBlocksPerFile);
+    }
+    blockSize = pendingFile.getPreferredBlockSize();
+    clientMachine = pendingFile.getFileUnderConstructionFeature()
+        .getClientMachine();
+    replication = pendingFile.getFileReplication();
+    storagePolicyID = pendingFile.getStoragePolicyID();
+    return new ValidateAddBlockResult(blockSize, replication, storagePolicyID,
+                                    clientMachine);
+  }
+
+  static LocatedBlock makeLocatedBlock(FSNamesystem fsn, Block blk,
+      DatanodeStorageInfo[] locs, long offset) throws IOException {
+    LocatedBlock lBlk = BlockManager.newLocatedBlock(fsn.getExtendedBlock(blk),
+                                                     locs, offset, false);
+    fsn.getBlockManager().setBlockToken(lBlk,
+                                        BlockTokenIdentifier.AccessMode.WRITE);
+    return lBlk;
+  }
+
+  /**
+   * Part II of getAdditionalBlock().
+   * Should repeat the same analysis of the file state as in Part 1,
+   * but under the write lock.
+   * If the conditions still hold, then allocate a new block with
+   * the new targets, add it to the INode and to the BlocksMap.
+   */
+  static LocatedBlock storeAllocatedBlock(FSNamesystem fsn, String src,
+      long fileId, String clientName, ExtendedBlock previous,
+      DatanodeStorageInfo[] targets) throws IOException {
+    long offset;
+    // Run the full analysis again, since things could have changed
+    // while chooseTarget() was executing.
+    LocatedBlock[] onRetryBlock = new LocatedBlock[1];
+    FileState fileState = analyzeFileState(fsn, src, fileId, clientName,
+                                           previous, onRetryBlock);
+    final INodeFile pendingFile = fileState.inode;
+    src = fileState.path;
+
+    if (onRetryBlock[0] != null) {
+      if (onRetryBlock[0].getLocations().length > 0) {
+        // This is a retry. Just return the last block if having locations.
+        return onRetryBlock[0];
+      } else {
+        // add new chosen targets to already allocated block and return
+        BlockInfoContiguous lastBlockInFile = pendingFile.getLastBlock();
+        ((BlockInfoContiguousUnderConstruction) lastBlockInFile)
+            .setExpectedLocations(targets);
+        offset = pendingFile.computeFileSize();
+        return makeLocatedBlock(fsn, lastBlockInFile, targets, offset);
+      }
+    }
+
+    // commit the last block and complete it if it has minimum replicas
+    fsn.commitOrCompleteLastBlock(pendingFile, fileState.iip,
+                                  ExtendedBlock.getLocalBlock(previous));
+
+    // allocate new block, record block locations in INode.
+    Block newBlock = fsn.createNewBlock();
+    INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile);
+    saveAllocatedBlock(fsn, src, inodesInPath, newBlock, targets);
+
+    persistNewBlock(fsn, src, pendingFile);
+    offset = pendingFile.computeFileSize();
+
+    // Return located block
+    return makeLocatedBlock(fsn, newBlock, targets, offset);
+  }
+
+  static DatanodeStorageInfo[] chooseTargetForNewBlock(
+      BlockManager bm, String src, DatanodeInfo[] excludedNodes, String[]
+      favoredNodes, ValidateAddBlockResult r) throws IOException {
+    Node clientNode = bm.getDatanodeManager()
+        .getDatanodeByHost(r.clientMachine);
+    if (clientNode == null) {
+      clientNode = getClientNode(bm, r.clientMachine);
+    }
+
+    Set<Node> excludedNodesSet = null;
+    if (excludedNodes != null) {
+      excludedNodesSet = new HashSet<>(excludedNodes.length);
+      Collections.addAll(excludedNodesSet, excludedNodes);
+    }
+    List<String> favoredNodesList = (favoredNodes == null) ? null
+        : Arrays.asList(favoredNodes);
+
+    // choose targets for the new block to be allocated.
+    return bm.chooseTarget4NewBlock(src, r.replication, clientNode,
+                                    excludedNodesSet, r.blockSize,
+                                    favoredNodesList, r.storagePolicyID);
+  }
+
+  /**
+   * Resolve clientmachine address to get a network location path
+   */
+  static Node getClientNode(BlockManager bm, String clientMachine) {
+    List<String> hosts = new ArrayList<>(1);
+    hosts.add(clientMachine);
+    List<String> rName = bm.getDatanodeManager()
+        .resolveNetworkLocation(hosts);
+    Node clientNode = null;
+    if (rName != null) {
+      // Able to resolve clientMachine mapping.
+      // Create a temp node to findout the rack local nodes
+      clientNode = new NodeBase(rName.get(0) + NodeBase.PATH_SEPARATOR_STR
+          + clientMachine);
+    }
+    return clientNode;
+  }
+
+  /**
+   * Add a block to the file. Returns a reference to the added block.
+   */
+  private static BlockInfoContiguous addBlock(
+      FSDirectory fsd, String path, INodesInPath inodesInPath, Block block,
+      DatanodeStorageInfo[] targets) throws IOException {
+    fsd.writeLock();
+    try {
+      final INodeFile fileINode = inodesInPath.getLastINode().asFile();
+      Preconditions.checkState(fileINode.isUnderConstruction());
+
+      // check quota limits and updated space consumed
+      fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
+          fileINode.getPreferredBlockReplication(), true);
+
+      // associate new last block for the file
+      BlockInfoContiguousUnderConstruction blockInfo =
+        new BlockInfoContiguousUnderConstruction(
+            block,
+            fileINode.getFileReplication(),
+            HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION,
+            targets);
+      fsd.getBlockManager().addBlockCollection(blockInfo, fileINode);
+      fileINode.addBlock(blockInfo);
+
+      if(NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug("DIR* FSDirectory.addBlock: "
+            + path + " with " + block
+            + " block is added to the in-memory "
+            + "file system");
+      }
+      return blockInfo;
+    } finally {
+      fsd.writeUnlock();
+    }
+  }
+
+  private static FileState analyzeFileState(
+      FSNamesystem fsn, String src, long fileId, String clientName,
+      ExtendedBlock previous, LocatedBlock[] onRetryBlock)
+          throws IOException  {
+    assert fsn.hasReadLock();
+
+    checkBlock(fsn, previous);
+    onRetryBlock[0] = null;
+    fsn.checkNameNodeSafeMode("Cannot add block to " + src);
+
+    // have we exceeded the configured limit of fs objects.
+    fsn.checkFsObjectLimit();
+
+    Block previousBlock = ExtendedBlock.getLocalBlock(previous);
+    final INode inode;
+    final INodesInPath iip;
+    if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
+      // Older clients may not have given us an inode ID to work with.
+      // In this case, we have to try to resolve the path and hope it
+      // hasn't changed or been deleted since the file was opened for write.
+      iip = fsn.dir.getINodesInPath4Write(src);
+      inode = iip.getLastINode();
+    } else {
+      // Newer clients pass the inode ID, so we can just get the inode
+      // directly.
+      inode = fsn.dir.getInode(fileId);
+      iip = INodesInPath.fromINode(inode);
+      if (inode != null) {
+        src = iip.getPath();
+      }
+    }
+    final INodeFile file = fsn.checkLease(src, clientName,
+                                                 inode, fileId);
+    BlockInfoContiguous lastBlockInFile = file.getLastBlock();
+    if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
+      // The block that the client claims is the current last block
+      // doesn't match up with what we think is the last block. There are
+      // four possibilities:
+      // 1) This is the first block allocation of an append() pipeline
+      //    which started appending exactly at or exceeding the block boundary.
+      //    In this case, the client isn't passed the previous block,
+      //    so it makes the allocateBlock() call with previous=null.
+      //    We can distinguish this since the last block of the file
+      //    will be exactly a full block.
+      // 2) This is a retry from a client that missed the response of a
+      //    prior getAdditionalBlock() call, perhaps because of a network
+      //    timeout, or because of an HA failover. In that case, we know
+      //    by the fact that the client is re-issuing the RPC that it
+      //    never began to write to the old block. Hence it is safe to
+      //    to return the existing block.
+      // 3) This is an entirely bogus request/bug -- we should error out
+      //    rather than potentially appending a new block with an empty
+      //    one in the middle, etc
+      // 4) This is a retry from a client that timed out while
+      //    the prior getAdditionalBlock() is still being processed,
+      //    currently working on chooseTarget().
+      //    There are no means to distinguish between the first and
+      //    the second attempts in Part I, because the first one hasn't
+      //    changed the namesystem state yet.
+      //    We run this analysis again in Part II where case 4 is impossible.
+
+      BlockInfoContiguous penultimateBlock = file.getPenultimateBlock();
+      if (previous == null &&
+          lastBlockInFile != null &&
+          lastBlockInFile.getNumBytes() >= file.getPreferredBlockSize() &&
+          lastBlockInFile.isComplete()) {
+        // Case 1
+        if (NameNode.stateChangeLog.isDebugEnabled()) {
+           NameNode.stateChangeLog.debug(
+               "BLOCK* NameSystem.allocateBlock: handling block allocation" +
+               " writing to a file with a complete previous block: src=" +
+               src + " lastBlock=" + lastBlockInFile);
+        }
+      } else if (Block.matchingIdAndGenStamp(penultimateBlock, previousBlock)) {
+        if (lastBlockInFile.getNumBytes() != 0) {
+          throw new IOException(
+              "Request looked like a retry to allocate block " +
+              lastBlockInFile + " but it already contains " +
+              lastBlockInFile.getNumBytes() + " bytes");
+        }
+
+        // Case 2
+        // Return the last block.
+        NameNode.stateChangeLog.info("BLOCK* allocateBlock: caught retry for " +
+            "allocation of a new block in " + src + ". Returning previously" +
+            " allocated block " + lastBlockInFile);
+        long offset = file.computeFileSize();
+        BlockInfoContiguousUnderConstruction lastBlockUC =
+            (BlockInfoContiguousUnderConstruction) lastBlockInFile;
+        onRetryBlock[0] = makeLocatedBlock(fsn, lastBlockInFile,
+            lastBlockUC.getExpectedStorageLocations(), offset);
+        return new FileState(file, src, iip);
+      } else {
+        // Case 3
+        throw new IOException("Cannot allocate block in " + src + ": " +
+            "passed 'previous' block " + previous + " does not match actual " +
+            "last block in file " + lastBlockInFile);
+      }
+    }
+    return new FileState(file, src, iip);
+  }
+
+  static boolean completeFile(FSNamesystem fsn, FSPermissionChecker pc,
+      final String srcArg, String holder, ExtendedBlock last, long fileId)
+      throws IOException {
+    String src = srcArg;
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " +
+                                        src + " for " + holder);
+    }
+    checkBlock(fsn, last);
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    src = fsn.dir.resolvePath(pc, src, pathComponents);
+    boolean success = completeFileInternal(fsn, src, holder,
+                                           ExtendedBlock.getLocalBlock(last),
+                                           fileId);
+    if (success) {
+      NameNode.stateChangeLog.info("DIR* completeFile: " + srcArg
+                                       + " is closed by " + holder);
+    }
+    return success;
+  }
+
+  private static boolean completeFileInternal(
+      FSNamesystem fsn, String src, String holder, Block last, long fileId)
+      throws IOException {
+    assert fsn.hasWriteLock();
+    final INodeFile pendingFile;
+    final INodesInPath iip;
+    INode inode = null;
+    try {
+      if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
+        // Older clients may not have given us an inode ID to work with.
+        // In this case, we have to try to resolve the path and hope it
+        // hasn't changed or been deleted since the file was opened for write.
+        iip = fsn.dir.getINodesInPath(src, true);
+        inode = iip.getLastINode();
+      } else {
+        inode = fsn.dir.getInode(fileId);
+        iip = INodesInPath.fromINode(inode);
+        if (inode != null) {
+          src = iip.getPath();
+        }
+      }
+      pendingFile = fsn.checkLease(src, holder, inode, fileId);
+    } catch (LeaseExpiredException lee) {
+      if (inode != null && inode.isFile() &&
+          !inode.asFile().isUnderConstruction()) {
+        // This could be a retry RPC - i.e the client tried to close
+        // the file, but missed the RPC response. Thus, it is trying
+        // again to close the file. If the file still exists and
+        // the client's view of the last block matches the actual
+        // last block, then we'll treat it as a successful close.
+        // See HDFS-3031.
+        final Block realLastBlock = inode.asFile().getLastBlock();
+        if (Block.matchingIdAndGenStamp(last, realLastBlock)) {
+          NameNode.stateChangeLog.info("DIR* completeFile: " +
+              "request from " + holder + " to complete inode " + fileId +
+              "(" + src + ") which is already closed. But, it appears to be " +
+              "an RPC retry. Returning success");
+          return true;
+        }
+      }
+      throw lee;
+    }
+    // Check the state of the penultimate block. It should be completed
+    // before attempting to complete the last one.
+    if (!fsn.checkFileProgress(src, pendingFile, false)) {
+      return false;
+    }
+
+    // commit the last block and complete it if it has minimum replicas
+    fsn.commitOrCompleteLastBlock(pendingFile, iip, last);
+
+    if (!fsn.checkFileProgress(src, pendingFile, true)) {
+      return false;
+    }
+
+    fsn.finalizeINodeFileUnderConstruction(src, pendingFile,
+        Snapshot.CURRENT_STATE_ID);
+    return true;
+  }
+
+  /**
+   * Persist the new block (the last block of the given file).
+   */
+  private static void persistNewBlock(
+      FSNamesystem fsn, String path, INodeFile file) {
+    Preconditions.checkArgument(file.isUnderConstruction());
+    fsn.getEditLog().logAddBlock(path, file);
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("persistNewBlock: "
+              + path + " with new block " + file.getLastBlock().toString()
+              + ", current total block count is " + file.getBlocks().length);
+    }
+  }
+
+  /**
+   * Save allocated block at the given pending filename
+   *
+   * @param fsn FSNamesystem
+   * @param src path to the file
+   * @param inodesInPath representing each of the components of src.
+   *                     The last INode is the INode for {@code src} file.
+   * @param newBlock newly allocated block to be save
+   * @param targets target datanodes where replicas of the new block is placed
+   * @throws QuotaExceededException If addition of block exceeds space quota
+   */
+  private static void saveAllocatedBlock(
+      FSNamesystem fsn, String src, INodesInPath inodesInPath, Block newBlock,
+      DatanodeStorageInfo[] targets)
+      throws IOException {
+    assert fsn.hasWriteLock();
+    BlockInfoContiguous b = addBlock(fsn.dir, src, inodesInPath, newBlock,
+                                     targets);
+    NameNode.stateChangeLog.info("BLOCK* allocate " + b + " for " + src);
+    DatanodeStorageInfo.incrementBlocksScheduled(targets);
+  }
+
+  private static class FileState {
+    final INodeFile inode;
+    final String path;
+    final INodesInPath iip;
+
+    FileState(INodeFile inode, String fullPath, INodesInPath iip) {
+      this.inode = inode;
+      this.path = fullPath;
+      this.iip = iip;
+    }
+  }
+
+  static class ValidateAddBlockResult {
+    final long blockSize;
+    final int replication;
+    final byte storagePolicyID;
+    final String clientMachine;
+
+    ValidateAddBlockResult(
+        long blockSize, int replication, byte storagePolicyID,
+        String clientMachine) {
+      this.blockSize = blockSize;
+      this.replication = replication;
+      this.storagePolicyID = storagePolicyID;
+      this.clientMachine = clientMachine;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5afac58/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 1583815..c2ed956 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -55,12 +55,9 @@ import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.util.ByteArray;
 import org.apache.hadoop.hdfs.util.EnumCounters;
@@ -308,7 +305,7 @@ public class FSDirectory implements Closeable {
     return namesystem;
   }
 
-  private BlockManager getBlockManager() {
+  BlockManager getBlockManager() {
     return getFSNamesystem().getBlockManager();
   }
 
@@ -479,79 +476,6 @@ public class FSDirectory implements Closeable {
   }
 
   /**
-   * Add a block to the file. Returns a reference to the added block.
-   */
-  BlockInfoContiguous addBlock(String path, INodesInPath inodesInPath,
-      Block block, DatanodeStorageInfo[] targets) throws IOException {
-    writeLock();
-    try {
-      final INodeFile fileINode = inodesInPath.getLastINode().asFile();
-      Preconditions.checkState(fileINode.isUnderConstruction());
-
-      // check quota limits and updated space consumed
-      updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
-          fileINode.getPreferredBlockReplication(), true);
-
-      // associate new last block for the file
-      BlockInfoContiguousUnderConstruction blockInfo =
-        new BlockInfoContiguousUnderConstruction(
-            block,
-            fileINode.getFileReplication(),
-            BlockUCState.UNDER_CONSTRUCTION,
-            targets);
-      getBlockManager().addBlockCollection(blockInfo, fileINode);
-      fileINode.addBlock(blockInfo);
-
-      if(NameNode.stateChangeLog.isDebugEnabled()) {
-        NameNode.stateChangeLog.debug("DIR* FSDirectory.addBlock: "
-            + path + " with " + block
-            + " block is added to the in-memory "
-            + "file system");
-      }
-      return blockInfo;
-    } finally {
-      writeUnlock();
-    }
-  }
-
-  /**
-   * Remove a block from the file.
-   * @return Whether the block exists in the corresponding file
-   */
-  boolean removeBlock(String path, INodesInPath iip, INodeFile fileNode,
-      Block block) throws IOException {
-    Preconditions.checkArgument(fileNode.isUnderConstruction());
-    writeLock();
-    try {
-      return unprotectedRemoveBlock(path, iip, fileNode, block);
-    } finally {
-      writeUnlock();
-    }
-  }
-  
-  boolean unprotectedRemoveBlock(String path, INodesInPath iip,
-      INodeFile fileNode, Block block) throws IOException {
-    // modify file-> block and blocksMap
-    // fileNode should be under construction
-    BlockInfoContiguousUnderConstruction uc = fileNode.removeLastBlock(block);
-    if (uc == null) {
-      return false;
-    }
-    getBlockManager().removeBlockFromMap(block);
-
-    if(NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* FSDirectory.removeBlock: "
-          +path+" with "+block
-          +" block is removed from the file system");
-    }
-
-    // update space consumed
-    updateCount(iip, 0, -fileNode.getPreferredBlockSize(),
-        fileNode.getPreferredBlockReplication(), true);
-    return true;
-  }
-
-  /**
    * This is a wrapper for resolvePath(). If the path passed
    * is prefixed with /.reserved/raw, then it checks to ensure that the caller
    * has super user privileges.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5afac58/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index f75c117..dec1298 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -1037,7 +1037,8 @@ public class FSEditLogLoader {
             + path);
       }
       Block oldBlock = oldBlocks[oldBlocks.length - 1];
-      boolean removed = fsDir.unprotectedRemoveBlock(path, iip, file, oldBlock);
+      boolean removed = FSDirWriteFileOp.unprotectedRemoveBlock(
+          fsDir, path, iip, file, oldBlock);
       if (!removed && !(op instanceof UpdateBlocksOp)) {
         throw new IOException("Trying to delete non-existant block " + oldBlock);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5afac58/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 4d82fab..0fec5ee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -268,7 +268,6 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
-import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@@ -484,7 +483,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   private final long maxFsObjects;          // maximum number of fs objects
 
   private final long minBlockSize;         // minimum block size
-  private final long maxBlocksPerFile;     // maximum # of blocks per file
+  final long maxBlocksPerFile;     // maximum # of blocks per file
 
   // precision of access times.
   private final long accessTimePrecision;
@@ -614,7 +613,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   boolean isHaEnabled() {
     return haEnabled;
   }
-  
+
   /**
    * Check the supplied configuration for correctness.
    * @param conf Supplies the configuration to validate.
@@ -1863,8 +1862,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
             : dir.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip);
 
     final LocatedBlocks blocks = blockManager.createLocatedBlocks(
-        inode.getBlocks(iip.getPathSnapshotId()), fileSize,
-        isUc, offset, length, needBlockToken, iip.isSnapshot(), feInfo);
+        inode.getBlocks(iip.getPathSnapshotId()), fileSize, isUc, offset,
+        length, needBlockToken, iip.isSnapshot(), feInfo);
 
     // Set caching information for the located blocks.
     for (LocatedBlock lb : blocks.getLocatedBlocks()) {
@@ -2232,8 +2231,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot set storage policy for " + src);
-      auditStat = FSDirAttrOp.setStoragePolicy(
-          dir, blockManager, src, policyName);
+      auditStat = FSDirAttrOp.setStoragePolicy(dir, blockManager, src,
+                                               policyName);
     } catch (AccessControlException e) {
       logAuditEvent(false, "setStoragePolicy", src);
       throw e;
@@ -2621,7 +2620,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       return toRemoveBlocks;
     } catch (IOException ie) {
       NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: " + src + " " +
-          ie.getMessage());
+                                       ie.getMessage());
       throw ie;
     }
   }
@@ -2703,8 +2702,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
             "Cannot append to lazy persist file " + src);
       }
       // Opening an existing file for append - may need to recover lease.
-      recoverLeaseInternal(RecoverLeaseOp.APPEND_FILE,
-          iip, src, holder, clientMachine, false);
+      recoverLeaseInternal(RecoverLeaseOp.APPEND_FILE, iip, src, holder,
+                           clientMachine, false);
       
       final BlockInfoContiguous lastBlock = myFile.getLastBlock();
       // Check that the block has at least minimum replication.
@@ -3042,290 +3041,49 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * are replicated.  Will return an empty 2-elt array if we want the
    * client to "try again later".
    */
-  LocatedBlock getAdditionalBlock(String src, long fileId, String clientName,
-      ExtendedBlock previous, Set<Node> excludedNodes, 
-      List<String> favoredNodes) throws IOException {
-    LocatedBlock[] onRetryBlock = new LocatedBlock[1];
-    DatanodeStorageInfo targets[] = getNewBlockTargets(src, fileId,
-        clientName, previous, excludedNodes, favoredNodes, onRetryBlock);
-    if (targets == null) {
-      assert onRetryBlock[0] != null : "Retry block is null";
-      // This is a retry. Just return the last block.
-      return onRetryBlock[0];
-    }
-    LocatedBlock newBlock = storeAllocatedBlock(
-        src, fileId, clientName, previous, targets);
-    return newBlock;
-  }
-
-  /**
-   * Part I of getAdditionalBlock().
-   * Analyze the state of the file under read lock to determine if the client
-   * can add a new block, detect potential retries, lease mismatches,
-   * and minimal replication of the penultimate block.
-   * 
-   * Generate target DataNode locations for the new block,
-   * but do not create the new block yet.
-   */
-  DatanodeStorageInfo[] getNewBlockTargets(String src, long fileId,
-      String clientName, ExtendedBlock previous, Set<Node> excludedNodes,
-      List<String> favoredNodes, LocatedBlock[] onRetryBlock) throws IOException {
-    final long blockSize;
-    final int replication;
-    final byte storagePolicyID;
-    Node clientNode = null;
-    String clientMachine = null;
-
+  LocatedBlock getAdditionalBlock(
+      String src, long fileId, String clientName, ExtendedBlock previous,
+      DatanodeInfo[] excludedNodes, String[] favoredNodes) throws IOException {
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("BLOCK* getAdditionalBlock: "
           + src + " inodeId " +  fileId  + " for " + clientName);
     }
 
-    checkOperation(OperationCategory.READ);
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    waitForLoadingFSImage();
+    LocatedBlock[] onRetryBlock = new LocatedBlock[1];
+    FSDirWriteFileOp.ValidateAddBlockResult r;
     FSPermissionChecker pc = getPermissionChecker();
+    checkOperation(OperationCategory.READ);
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      src = dir.resolvePath(pc, src, pathComponents);
-      FileState fileState = analyzeFileState(
-          src, fileId, clientName, previous, onRetryBlock);
-      final INodeFile pendingFile = fileState.inode;
-      // Check if the penultimate block is minimally replicated
-      if (!checkFileProgress(src, pendingFile, false)) {
-        throw new NotReplicatedYetException("Not replicated yet: " + src);
-      }
-      src = fileState.path;
-
-      if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) {
-        // This is a retry. No need to generate new locations.
-        // Use the last block if it has locations.
-        return null;
-      }
-      if (pendingFile.getBlocks().length >= maxBlocksPerFile) {
-        throw new IOException("File has reached the limit on maximum number of"
-            + " blocks (" + DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY
-            + "): " + pendingFile.getBlocks().length + " >= "
-            + maxBlocksPerFile);
-      }
-      blockSize = pendingFile.getPreferredBlockSize();
-      clientMachine = pendingFile.getFileUnderConstructionFeature()
-          .getClientMachine();
-      clientNode = blockManager.getDatanodeManager().getDatanodeByHost(
-          clientMachine);
-      replication = pendingFile.getFileReplication();
-      storagePolicyID = pendingFile.getStoragePolicyID();
+      r = FSDirWriteFileOp.validateAddBlock(this, pc, src, fileId, clientName,
+                                            previous, onRetryBlock);
     } finally {
       readUnlock();
     }
 
-    if (clientNode == null) {
-      clientNode = getClientNode(clientMachine);
+    if (r == null) {
+      assert onRetryBlock[0] != null : "Retry block is null";
+      // This is a retry. Just return the last block.
+      return onRetryBlock[0];
     }
 
-    // choose targets for the new block to be allocated.
-    return getBlockManager().chooseTarget4NewBlock( 
-        src, replication, clientNode, excludedNodes, blockSize, favoredNodes,
-        storagePolicyID);
-  }
+    DatanodeStorageInfo[] targets = FSDirWriteFileOp.chooseTargetForNewBlock(
+        blockManager, src, excludedNodes, favoredNodes, r);
 
-  /**
-   * Part II of getAdditionalBlock().
-   * Should repeat the same analysis of the file state as in Part 1,
-   * but under the write lock.
-   * If the conditions still hold, then allocate a new block with
-   * the new targets, add it to the INode and to the BlocksMap.
-   */
-  LocatedBlock storeAllocatedBlock(String src, long fileId, String clientName,
-      ExtendedBlock previous, DatanodeStorageInfo[] targets) throws IOException {
-    Block newBlock = null;
-    long offset;
     checkOperation(OperationCategory.WRITE);
-    waitForLoadingFSImage();
     writeLock();
+    LocatedBlock lb;
     try {
       checkOperation(OperationCategory.WRITE);
-      // Run the full analysis again, since things could have changed
-      // while chooseTarget() was executing.
-      LocatedBlock[] onRetryBlock = new LocatedBlock[1];
-      FileState fileState = 
-          analyzeFileState(src, fileId, clientName, previous, onRetryBlock);
-      final INodeFile pendingFile = fileState.inode;
-      src = fileState.path;
-
-      if (onRetryBlock[0] != null) {
-        if (onRetryBlock[0].getLocations().length > 0) {
-          // This is a retry. Just return the last block if having locations.
-          return onRetryBlock[0];
-        } else {
-          // add new chosen targets to already allocated block and return
-          BlockInfoContiguous lastBlockInFile = pendingFile.getLastBlock();
-          ((BlockInfoContiguousUnderConstruction) lastBlockInFile)
-              .setExpectedLocations(targets);
-          offset = pendingFile.computeFileSize();
-          return makeLocatedBlock(lastBlockInFile, targets, offset);
-        }
-      }
-
-      // commit the last block and complete it if it has minimum replicas
-      commitOrCompleteLastBlock(pendingFile, fileState.iip,
-                                ExtendedBlock.getLocalBlock(previous));
-
-      // allocate new block, record block locations in INode.
-      newBlock = createNewBlock();
-      INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile);
-      saveAllocatedBlock(src, inodesInPath, newBlock, targets);
-
-      persistNewBlock(src, pendingFile);
-      offset = pendingFile.computeFileSize();
+      lb = FSDirWriteFileOp.storeAllocatedBlock(
+          this, src, fileId, clientName, previous, targets);
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-
-    // Return located block
-    return makeLocatedBlock(newBlock, targets, offset);
-  }
-
-  /*
-   * Resolve clientmachine address to get a network location path
-   */
-  private Node getClientNode(String clientMachine) {
-    List<String> hosts = new ArrayList<String>(1);
-    hosts.add(clientMachine);
-    List<String> rName = getBlockManager().getDatanodeManager()
-        .resolveNetworkLocation(hosts);
-    Node clientNode = null;
-    if (rName != null) {
-      // Able to resolve clientMachine mapping.
-      // Create a temp node to findout the rack local nodes
-      clientNode = new NodeBase(rName.get(0) + NodeBase.PATH_SEPARATOR_STR
-          + clientMachine);
-    }
-    return clientNode;
-  }
-
-  static class FileState {
-    public final INodeFile inode;
-    public final String path;
-    public final INodesInPath iip;
-
-    public FileState(INodeFile inode, String fullPath, INodesInPath iip) {
-      this.inode = inode;
-      this.path = fullPath;
-      this.iip = iip;
-    }
-  }
-
-  FileState analyzeFileState(String src,
-                                long fileId,
-                                String clientName,
-                                ExtendedBlock previous,
-                                LocatedBlock[] onRetryBlock)
-          throws IOException  {
-    assert hasReadLock();
-
-    checkBlock(previous);
-    onRetryBlock[0] = null;
-    checkNameNodeSafeMode("Cannot add block to " + src);
-
-    // have we exceeded the configured limit of fs objects.
-    checkFsObjectLimit();
-
-    Block previousBlock = ExtendedBlock.getLocalBlock(previous);
-    final INode inode;
-    final INodesInPath iip;
-    if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
-      // Older clients may not have given us an inode ID to work with.
-      // In this case, we have to try to resolve the path and hope it
-      // hasn't changed or been deleted since the file was opened for write.
-      iip = dir.getINodesInPath4Write(src);
-      inode = iip.getLastINode();
-    } else {
-      // Newer clients pass the inode ID, so we can just get the inode
-      // directly.
-      inode = dir.getInode(fileId);
-      iip = INodesInPath.fromINode(inode);
-      if (inode != null) {
-        src = iip.getPath();
-      }
-    }
-    final INodeFile pendingFile = checkLease(src, clientName, inode, fileId);
-    BlockInfoContiguous lastBlockInFile = pendingFile.getLastBlock();
-    if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
-      // The block that the client claims is the current last block
-      // doesn't match up with what we think is the last block. There are
-      // four possibilities:
-      // 1) This is the first block allocation of an append() pipeline
-      //    which started appending exactly at or exceeding the block boundary.
-      //    In this case, the client isn't passed the previous block,
-      //    so it makes the allocateBlock() call with previous=null.
-      //    We can distinguish this since the last block of the file
-      //    will be exactly a full block.
-      // 2) This is a retry from a client that missed the response of a
-      //    prior getAdditionalBlock() call, perhaps because of a network
-      //    timeout, or because of an HA failover. In that case, we know
-      //    by the fact that the client is re-issuing the RPC that it
-      //    never began to write to the old block. Hence it is safe to
-      //    to return the existing block.
-      // 3) This is an entirely bogus request/bug -- we should error out
-      //    rather than potentially appending a new block with an empty
-      //    one in the middle, etc
-      // 4) This is a retry from a client that timed out while
-      //    the prior getAdditionalBlock() is still being processed,
-      //    currently working on chooseTarget(). 
-      //    There are no means to distinguish between the first and 
-      //    the second attempts in Part I, because the first one hasn't
-      //    changed the namesystem state yet.
-      //    We run this analysis again in Part II where case 4 is impossible.
-
-      BlockInfoContiguous penultimateBlock = pendingFile.getPenultimateBlock();
-      if (previous == null &&
-          lastBlockInFile != null &&
-          lastBlockInFile.getNumBytes() >= pendingFile.getPreferredBlockSize() &&
-          lastBlockInFile.isComplete()) {
-        // Case 1
-        if (NameNode.stateChangeLog.isDebugEnabled()) {
-           NameNode.stateChangeLog.debug(
-               "BLOCK* NameSystem.allocateBlock: handling block allocation" +
-               " writing to a file with a complete previous block: src=" +
-               src + " lastBlock=" + lastBlockInFile);
-        }
-      } else if (Block.matchingIdAndGenStamp(penultimateBlock, previousBlock)) {
-        if (lastBlockInFile.getNumBytes() != 0) {
-          throw new IOException(
-              "Request looked like a retry to allocate block " +
-              lastBlockInFile + " but it already contains " +
-              lastBlockInFile.getNumBytes() + " bytes");
-        }
-
-        // Case 2
-        // Return the last block.
-        NameNode.stateChangeLog.info("BLOCK* allocateBlock: " +
-            "caught retry for allocation of a new block in " +
-            src + ". Returning previously allocated block " + lastBlockInFile);
-        long offset = pendingFile.computeFileSize();
-        onRetryBlock[0] = makeLocatedBlock(lastBlockInFile,
-            ((BlockInfoContiguousUnderConstruction)lastBlockInFile).getExpectedStorageLocations(),
-            offset);
-        return new FileState(pendingFile, src, iip);
-      } else {
-        // Case 3
-        throw new IOException("Cannot allocate block in " + src + ": " +
-            "passed 'previous' block " + previous + " does not match actual " +
-            "last block in file " + lastBlockInFile);
-      }
-    }
-    return new FileState(pendingFile, src, iip);
-  }
-
-  LocatedBlock makeLocatedBlock(Block blk, DatanodeStorageInfo[] locs,
-                                        long offset) throws IOException {
-    LocatedBlock lBlk = BlockManager.newLocatedBlock(
-        getExtendedBlock(blk), locs, offset, false);
-    getBlockManager().setBlockToken(
-        lBlk, BlockTokenIdentifier.AccessMode.WRITE);
-    return lBlk;
+    return lb;
   }
 
   /** @see ClientProtocol#getAdditionalDatanode */
@@ -3378,7 +3136,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
 
     if (clientnode == null) {
-      clientnode = getClientNode(clientMachine);
+      clientnode = FSDirWriteFileOp.getClientNode(blockManager, clientMachine);
     }
 
     // choose new datanodes.
@@ -3394,60 +3152,32 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   /**
    * The client would like to let go of the given block
    */
-  boolean abandonBlock(ExtendedBlock b, long fileId, String src, String holder)
+  void abandonBlock(ExtendedBlock b, long fileId, String src, String holder)
       throws IOException {
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: " + b
           + "of file " + src);
     }
+    waitForLoadingFSImage();
     checkOperation(OperationCategory.WRITE);
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     FSPermissionChecker pc = getPermissionChecker();
-    waitForLoadingFSImage();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot abandon block " + b + " for file" + src);
-      src = dir.resolvePath(pc, src, pathComponents);
-
-      final INode inode;
-      final INodesInPath iip;
-      if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
-        // Older clients may not have given us an inode ID to work with.
-        // In this case, we have to try to resolve the path and hope it
-        // hasn't changed or been deleted since the file was opened for write.
-        iip = dir.getINodesInPath(src, true);
-        inode = iip.getLastINode();
-      } else {
-        inode = dir.getInode(fileId);
-        iip = INodesInPath.fromINode(inode);
-        if (inode != null) {
-          src = iip.getPath();
-        }
-      }
-      final INodeFile file = checkLease(src, holder, inode, fileId);
-
-      // Remove the block from the pending creates list
-      boolean removed = dir.removeBlock(src, iip, file,
-          ExtendedBlock.getLocalBlock(b));
-      if (!removed) {
-        return true;
-      }
+      FSDirWriteFileOp.abandonBlock(dir, pc, b, fileId, src, holder);
       if(NameNode.stateChangeLog.isDebugEnabled()) {
         NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
                                       + b + " is removed from pendingCreates");
       }
-      persistBlocks(src, file, false);
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-
-    return true;
   }
 
-  private INodeFile checkLease(String src, String holder, INode inode,
-      long fileId) throws LeaseExpiredException, FileNotFoundException {
+  INodeFile checkLease(
+      String src, String holder, INode inode, long fileId) throws LeaseExpiredException, FileNotFoundException {
     assert hasReadLock();
     final String ident = src + " (inode " + fileId + ")";
     if (inode == null) {
@@ -3492,120 +3222,30 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    *         (e.g if not all blocks have reached minimum replication yet)
    * @throws IOException on error (eg lease mismatch, file not open, file deleted)
    */
-  boolean completeFile(final String srcArg, String holder,
+  boolean completeFile(final String src, String holder,
                        ExtendedBlock last, long fileId)
-    throws SafeModeException, UnresolvedLinkException, IOException {
-    String src = srcArg;
-    if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " +
-          src + " for " + holder);
-    }
-    checkBlock(last);
+    throws IOException {
     boolean success = false;
     checkOperation(OperationCategory.WRITE);
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
-    FSPermissionChecker pc = getPermissionChecker();
     waitForLoadingFSImage();
+    FSPermissionChecker pc = getPermissionChecker();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot complete file " + src);
-      src = dir.resolvePath(pc, src, pathComponents);
-      success = completeFileInternal(src, holder,
-        ExtendedBlock.getLocalBlock(last), fileId);
+      success = FSDirWriteFileOp.completeFile(this, pc, src, holder, last,
+                                              fileId);
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    if (success) {
-      NameNode.stateChangeLog.info("DIR* completeFile: " + srcArg
-          + " is closed by " + holder);
-    }
     return success;
   }
 
-  private boolean completeFileInternal(String src, String holder, Block last,
-      long fileId) throws IOException {
-    assert hasWriteLock();
-    final INodeFile pendingFile;
-    final INodesInPath iip;
-    INode inode = null;
-    try {
-      if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
-        // Older clients may not have given us an inode ID to work with.
-        // In this case, we have to try to resolve the path and hope it
-        // hasn't changed or been deleted since the file was opened for write.
-        iip = dir.getINodesInPath(src, true);
-        inode = iip.getLastINode();
-      } else {
-        inode = dir.getInode(fileId);
-        iip = INodesInPath.fromINode(inode);
-        if (inode != null) {
-          src = iip.getPath();
-        }
-      }
-      pendingFile = checkLease(src, holder, inode, fileId);
-    } catch (LeaseExpiredException lee) {
-      if (inode != null && inode.isFile() &&
-          !inode.asFile().isUnderConstruction()) {
-        // This could be a retry RPC - i.e the client tried to close
-        // the file, but missed the RPC response. Thus, it is trying
-        // again to close the file. If the file still exists and
-        // the client's view of the last block matches the actual
-        // last block, then we'll treat it as a successful close.
-        // See HDFS-3031.
-        final Block realLastBlock = inode.asFile().getLastBlock();
-        if (Block.matchingIdAndGenStamp(last, realLastBlock)) {
-          NameNode.stateChangeLog.info("DIR* completeFile: " +
-              "request from " + holder + " to complete inode " + fileId +
-              "(" + src + ") which is already closed. But, it appears to be " +
-              "an RPC retry. Returning success");
-          return true;
-        }
-      }
-      throw lee;
-    }
-    // Check the state of the penultimate block. It should be completed
-    // before attempting to complete the last one.
-    if (!checkFileProgress(src, pendingFile, false)) {
-      return false;
-    }
-
-    // commit the last block and complete it if it has minimum replicas
-    commitOrCompleteLastBlock(pendingFile, iip, last);
-
-    if (!checkFileProgress(src, pendingFile, true)) {
-      return false;
-    }
-
-    finalizeINodeFileUnderConstruction(src, pendingFile,
-        Snapshot.CURRENT_STATE_ID);
-    return true;
-  }
-
-  /**
-   * Save allocated block at the given pending filename
-   * 
-   * @param src path to the file
-   * @param inodesInPath representing each of the components of src.
-   *                     The last INode is the INode for {@code src} file.
-   * @param newBlock newly allocated block to be save
-   * @param targets target datanodes where replicas of the new block is placed
-   * @throws QuotaExceededException If addition of block exceeds space quota
-   */
-  private void saveAllocatedBlock(String src, INodesInPath inodesInPath,
-      Block newBlock, DatanodeStorageInfo[] targets)
-      throws IOException {
-    assert hasWriteLock();
-    BlockInfoContiguous b = dir.addBlock(src, inodesInPath, newBlock, targets);
-    NameNode.stateChangeLog.info("BLOCK* allocate " + b + " for " + src);
-    DatanodeStorageInfo.incrementBlocksScheduled(targets);
-  }
-
   /**
    * Create new block with a unique block id and a new generation stamp.
    */
-  private Block createNewBlock() throws IOException {
+  Block createNewBlock() throws IOException {
     assert hasWriteLock();
     Block b = new Block(nextBlockId(), 0, 0);
     // Increment the generation stamp for every new block.
@@ -3997,7 +3637,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         pendingFile.getFileUnderConstructionFeature().updateLengthOfLastBlock(
             pendingFile, lastBlockLength);
       }
-      persistBlocks(src, pendingFile, false);
+      FSDirWriteFileOp.persistBlocks(dir, src, pendingFile, false);
     } finally {
       writeUnlock();
     }
@@ -4167,8 +3807,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     return leaseManager.reassignLease(lease, pendingFile, newHolder);
   }
 
-  private void commitOrCompleteLastBlock(final INodeFile fileINode,
-      final INodesInPath iip, final Block commitBlock) throws IOException {
+  void commitOrCompleteLastBlock(
+      final INodeFile fileINode, final INodesInPath iip,
+      final Block commitBlock) throws IOException {
     assert hasWriteLock();
     Preconditions.checkArgument(fileINode.isUnderConstruction());
     if (!blockManager.commitOrCompleteLastBlock(fileINode, commitBlock)) {
@@ -4186,14 +3827,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
   }
 
-  private void finalizeINodeFileUnderConstruction(String src,
-      INodeFile pendingFile, int latestSnapshot) throws IOException {
+  void finalizeINodeFileUnderConstruction(
+      String src, INodeFile pendingFile, int latestSnapshot) throws IOException {
     assert hasWriteLock();
 
     FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature();
     Preconditions.checkArgument(uc != null);
     leaseManager.removeLease(uc.getClientName(), pendingFile);
-    
+
     pendingFile.recordModification(latestSnapshot);
 
     // The file is no longer pending.
@@ -4405,7 +4046,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       } else {
         // If this commit does not want to close the file, persist blocks
         src = iFile.getFullPathName();
-        persistBlocks(src, iFile, false);
+        FSDirWriteFileOp.persistBlocks(dir, src, iFile, false);
       }
     } finally {
       writeUnlock();
@@ -4596,24 +4237,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
 
   /**
-   * Persist the block list for the inode.
-   * @param path
-   * @param file
-   * @param logRetryCache
-   */
-  private void persistBlocks(String path, INodeFile file,
-                             boolean logRetryCache) {
-    assert hasWriteLock();
-    Preconditions.checkArgument(file.isUnderConstruction());
-    getEditLog().logUpdateBlocks(path, file, logRetryCache);
-    if(NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("persistBlocks: " + path
-              + " with " + file.getBlocks().length + " blocks is persisted to" +
-              " the file system");
-    }
-  }
-
-  /**
    * Close file.
    * @param path
    * @param file
@@ -4800,13 +4423,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
   public FSEditLog getEditLog() {
     return getFSImage().getEditLog();
-  }    
-
-  private void checkBlock(ExtendedBlock block) throws IOException {
-    if (block != null && !this.blockPoolId.equals(block.getBlockPoolId())) {
-      throw new IOException("Unexpected BlockPoolId " + block.getBlockPoolId()
-          + " - expected " + blockPoolId);
-    }
   }
 
   @Metric({"MissingBlocks", "Number of missing blocks"})
@@ -5080,21 +4696,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
 
   /**
-   * Persist the new block (the last block of the given file).
-   * @param path
-   * @param file
-   */
-  private void persistNewBlock(String path, INodeFile file) {
-    Preconditions.checkArgument(file.isUnderConstruction());
-    getEditLog().logAddBlock(path, file);
-    if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("persistNewBlock: "
-              + path + " with new block " + file.getLastBlock().toString()
-              + ", current total block count is " + file.getBlocks().length);
-    }
-  }
-
-  /**
    * SafeModeInfo contains information related to the safe mode.
    * <p>
    * An instance of {@link SafeModeInfo} is created when the name node
@@ -6399,7 +6000,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     blockinfo.setExpectedLocations(storages);
 
     String src = pendingFile.getFullPathName();
-    persistBlocks(src, pendingFile, logRetryCache);
+    FSDirWriteFileOp.persistBlocks(dir, src, pendingFile, logRetryCache);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5afac58/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 3311609..0d416a6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -713,23 +713,11 @@ class NameNodeRpcServer implements NamenodeProtocols {
       String[] favoredNodes)
       throws IOException {
     checkNNStartup();
-    if (stateChangeLog.isDebugEnabled()) {
-      stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " + src
-          + " fileId=" + fileId + " for " + clientName);
-    }
-    Set<Node> excludedNodesSet = null;
-    if (excludedNodes != null) {
-      excludedNodesSet = new HashSet<Node>(excludedNodes.length);
-      for (Node node : excludedNodes) {
-        excludedNodesSet.add(node);
-      }
-    }
-    List<String> favoredNodesList = (favoredNodes == null) ? null
-        : Arrays.asList(favoredNodes);
     LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,
-        clientName, previous, excludedNodesSet, favoredNodesList);
-    if (locatedBlock != null)
+        clientName, previous, excludedNodes, favoredNodes);
+    if (locatedBlock != null) {
       metrics.incrAddBlockOps();
+    }
     return locatedBlock;
   }
 
@@ -770,13 +758,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
   public void abandonBlock(ExtendedBlock b, long fileId, String src,
         String holder) throws IOException {
     checkNNStartup();
-    if(stateChangeLog.isDebugEnabled()) {
-      stateChangeLog.debug("*BLOCK* NameNode.abandonBlock: "
-          +b+" of file "+src);
-    }
-    if (!namesystem.abandonBlock(b, fileId, src, holder)) {
-      throw new IOException("Cannot abandon block during write to " + src);
-    }
+    namesystem.abandonBlock(b, fileId, src, holder);
   }
 
   @Override // ClientProtocol
@@ -784,10 +766,6 @@ class NameNodeRpcServer implements NamenodeProtocols {
                           ExtendedBlock last,  long fileId)
       throws IOException {
     checkNNStartup();
-    if(stateChangeLog.isDebugEnabled()) {
-      stateChangeLog.debug("*DIR* NameNode.complete: "
-          + src + " fileId=" + fileId +" for " + clientName);
-    }
     return namesystem.completeFile(src, clientName, last, fileId);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5afac58/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
index 5a4134c..c92e79b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.io.EnumSetWritable;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 /**
  * Race between two threads simultaneously calling
@@ -88,25 +89,40 @@ public class TestAddBlockRetry {
     // start first addBlock()
     LOG.info("Starting first addBlock for " + src);
     LocatedBlock[] onRetryBlock = new LocatedBlock[1];
-    DatanodeStorageInfo targets[] = ns.getNewBlockTargets(
-        src, HdfsConstants.GRANDFATHER_INODE_ID, "clientName",
-        null, null, null, onRetryBlock);
+    ns.readLock();
+    FSDirWriteFileOp.ValidateAddBlockResult r;
+    FSPermissionChecker pc = Mockito.mock(FSPermissionChecker.class);
+    try {
+      r = FSDirWriteFileOp.validateAddBlock(ns, pc, src,
+                                            HdfsConstants.GRANDFATHER_INODE_ID,
+                                            "clientName", null, onRetryBlock);
+    } finally {
+      ns.readUnlock();;
+    }
+    DatanodeStorageInfo targets[] = FSDirWriteFileOp.chooseTargetForNewBlock(
+        ns.getBlockManager(), src, null, null, r);
     assertNotNull("Targets must be generated", targets);
 
     // run second addBlock()
     LOG.info("Starting second addBlock for " + src);
     nn.addBlock(src, "clientName", null, null,
-        HdfsConstants.GRANDFATHER_INODE_ID, null);
+                HdfsConstants.GRANDFATHER_INODE_ID, null);
     assertTrue("Penultimate block must be complete",
-        checkFileProgress(src, false));
+               checkFileProgress(src, false));
     LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE);
     assertEquals("Must be one block", 1, lbs.getLocatedBlocks().size());
     LocatedBlock lb2 = lbs.get(0);
     assertEquals("Wrong replication", REPLICATION, lb2.getLocations().length);
 
     // continue first addBlock()
-    LocatedBlock newBlock = ns.storeAllocatedBlock(
-        src, HdfsConstants.GRANDFATHER_INODE_ID, "clientName", null, targets);
+    ns.writeLock();
+    LocatedBlock newBlock;
+    try {
+      newBlock = FSDirWriteFileOp.storeAllocatedBlock(ns, src,
+          HdfsConstants.GRANDFATHER_INODE_ID, "clientName", null, targets);
+    } finally {
+      ns.writeUnlock();
+    }
     assertEquals("Blocks are not equal", lb2.getBlock(), newBlock.getBlock());
 
     // check locations

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5afac58/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java
index 3049612..ea560fe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderCon
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
 
 import java.io.IOException;
 
@@ -45,7 +46,9 @@ public class TestCommitBlockSynchronization {
   private FSNamesystem makeNameSystemSpy(Block block, INodeFile file)
       throws IOException {
     Configuration conf = new Configuration();
+    FSEditLog editlog = mock(FSEditLog.class);
     FSImage image = new FSImage(conf);
+    Whitebox.setInternalState(image, "editLog", editlog);
     final DatanodeStorageInfo[] targets = {};
 
     FSNamesystem namesystem = new FSNamesystem(conf, image);


Mime
View raw message