hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From whe...@apache.org
Subject hadoop git commit: HDFS-8495. Consolidate append() related implementation into a single class. Contributed by Rakesh R.
Date Wed, 22 Jul 2015 00:28:42 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk 393fe7177 -> 31f117138


HDFS-8495. Consolidate append() related implementation into a single class. Contributed by
Rakesh R.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/31f11713
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/31f11713
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/31f11713

Branch: refs/heads/trunk
Commit: 31f117138a00794de4951ee8433e304d72b04094
Parents: 393fe71
Author: Haohui Mai <wheat9@apache.org>
Authored: Tue Jul 21 17:25:23 2015 -0700
Committer: Haohui Mai <wheat9@apache.org>
Committed: Tue Jul 21 17:25:23 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hdfs/server/namenode/FSDirAppendOp.java     | 261 +++++++++++++++++++
 .../server/namenode/FSDirStatAndListingOp.java  |   2 +-
 .../hdfs/server/namenode/FSDirTruncateOp.java   |  16 +-
 .../hdfs/server/namenode/FSDirWriteFileOp.java  |   6 +-
 .../hdfs/server/namenode/FSEditLogLoader.java   |   4 +-
 .../hdfs/server/namenode/FSNamesystem.java      | 241 ++---------------
 7 files changed, 304 insertions(+), 229 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/31f11713/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 8122045..50803de 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -737,6 +737,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-8721. Add a metric for number of encryption zones.
     (Rakesh R via cnauroth)
 
+    HDFS-8495. Consolidate append() related implementation into a single class.
+    (Rakesh R via wheat9)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31f11713/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
new file mode 100644
index 0000000..abb2dc8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion.Feature;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Helper class to perform append operation.
+ */
+final class FSDirAppendOp {
+
+  /**
+   * Private constructor for preventing FSDirAppendOp object creation.
+   * Static-only class.
+   */
+  private FSDirAppendOp() {}
+
+  /**
+   * Append to an existing file.
+   * <p>
+   *
+   * The method returns the last block of the file if this is a partial block,
+   * which can still be used for writing more data. The client uses the
+   * returned block locations to form the data pipeline for this block.<br>
+   * The {@link LocatedBlock} will be null if the last block is full.
+   * The client then allocates a new block with the next call using
+   * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#addBlock}.
+   * <p>
+   *
+   * For description of parameters and exceptions thrown see
+   * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#append}
+   *
+   * @param fsn namespace
+   * @param srcArg path name
+   * @param pc permission checker to check fs permission
+   * @param holder client name
+   * @param clientMachine client machine info
+   * @param newBlock if the data is appended to a new block
+   * @param logRetryCache whether to record RPC ids in editlog for retry cache
+   *                      rebuilding
+   * @return the last block with status
+   */
+  static LastBlockWithStatus appendFile(final FSNamesystem fsn,
+      final String srcArg, final FSPermissionChecker pc, final String holder,
+      final String clientMachine, final boolean newBlock,
+      final boolean logRetryCache) throws IOException {
+    assert fsn.hasWriteLock();
+
+    final byte[][] pathComponents = FSDirectory
+        .getPathComponentsForReservedPath(srcArg);
+    final LocatedBlock lb;
+    final FSDirectory fsd = fsn.getFSDirectory();
+    final String src;
+    fsd.writeLock();
+    try {
+      src = fsd.resolvePath(pc, srcArg, pathComponents);
+      final INodesInPath iip = fsd.getINodesInPath4Write(src);
+      // Verify that the destination does not exist as a directory already
+      final INode inode = iip.getLastINode();
+      final String path = iip.getPath();
+      if (inode != null && inode.isDirectory()) {
+        throw new FileAlreadyExistsException("Cannot append to directory "
+            + path + "; already exists as a directory.");
+      }
+      if (fsd.isPermissionEnabled()) {
+        fsd.checkPathAccess(pc, iip, FsAction.WRITE);
+      }
+
+      if (inode == null) {
+        throw new FileNotFoundException(
+            "Failed to append to non-existent file " + path + " for client "
+                + clientMachine);
+      }
+      final INodeFile file = INodeFile.valueOf(inode, path, true);
+      BlockManager blockManager = fsd.getBlockManager();
+      final BlockStoragePolicy lpPolicy = blockManager
+          .getStoragePolicy("LAZY_PERSIST");
+      if (lpPolicy != null && lpPolicy.getId() == file.getStoragePolicyID()) {
+        throw new UnsupportedOperationException(
+            "Cannot append to lazy persist file " + path);
+      }
+      // Opening an existing file for append - may need to recover lease.
+      fsn.recoverLeaseInternal(RecoverLeaseOp.APPEND_FILE, iip, path, holder,
+          clientMachine, false);
+
+      final BlockInfo lastBlock = file.getLastBlock();
+      // Check that the block has at least minimum replication.
+      if (lastBlock != null && lastBlock.isComplete()
+          && !blockManager.isSufficientlyReplicated(lastBlock)) {
+        throw new IOException("append: lastBlock=" + lastBlock + " of src="
+            + path + " is not sufficiently replicated yet.");
+      }
+      lb = prepareFileForAppend(fsn, iip, holder, clientMachine, newBlock,
+          true, logRetryCache);
+    } catch (IOException ie) {
+      NameNode.stateChangeLog
+          .warn("DIR* NameSystem.append: " + ie.getMessage());
+      throw ie;
+    } finally {
+      fsd.writeUnlock();
+    }
+
+    HdfsFileStatus stat = FSDirStatAndListingOp.getFileInfo(fsd, src, false,
+        FSDirectory.isReservedRawName(srcArg), true);
+    if (lb != null) {
+      NameNode.stateChangeLog.debug(
+          "DIR* NameSystem.appendFile: file {} for {} at {} block {} block"
+              + " size {}", srcArg, holder, clientMachine, lb.getBlock(), lb
+              .getBlock().getNumBytes());
+    }
+    return new LastBlockWithStatus(lb, stat);
+  }
+
+  /**
+   * Convert current node to under construction.
+   * Recreate in-memory lease record.
+   *
+   * @param fsn namespace
+   * @param iip inodes in the path containing the file
+   * @param leaseHolder identifier of the lease holder on this file
+   * @param clientMachine identifier of the client machine
+   * @param newBlock if the data is appended to a new block
+   * @param writeToEditLog whether to persist this change to the edit log
+   * @param logRetryCache whether to record RPC ids in editlog for retry cache
+   *                      rebuilding
+   * @return the last block locations if the block is partial or null otherwise
+   * @throws IOException
+   */
+  static LocatedBlock prepareFileForAppend(final FSNamesystem fsn,
+      final INodesInPath iip, final String leaseHolder,
+      final String clientMachine, final boolean newBlock,
+      final boolean writeToEditLog, final boolean logRetryCache)
+      throws IOException {
+    assert fsn.hasWriteLock();
+
+    final INodeFile file = iip.getLastINode().asFile();
+    final QuotaCounts delta = verifyQuotaForUCBlock(fsn, file, iip);
+
+    file.recordModification(iip.getLatestSnapshotId());
+    file.toUnderConstruction(leaseHolder, clientMachine);
+
+    fsn.getLeaseManager().addLease(
+        file.getFileUnderConstructionFeature().getClientName(), file.getId());
+
+    LocatedBlock ret = null;
+    if (!newBlock) {
+      FSDirectory fsd = fsn.getFSDirectory();
+      ret = fsd.getBlockManager().convertLastBlockToUnderConstruction(file, 0);
+      if (ret != null && delta != null) {
+        Preconditions.checkState(delta.getStorageSpace() >= 0, "appending to"
+            + " a block with size larger than the preferred block size");
+        fsd.writeLock();
+        try {
+          fsd.updateCountNoQuotaCheck(iip, iip.length() - 1, delta);
+        } finally {
+          fsd.writeUnlock();
+        }
+      }
+    } else {
+      BlockInfo lastBlock = file.getLastBlock();
+      if (lastBlock != null) {
+        ExtendedBlock blk = new ExtendedBlock(fsn.getBlockPoolId(), lastBlock);
+        ret = new LocatedBlock(blk, new DatanodeInfo[0]);
+      }
+    }
+
+    if (writeToEditLog) {
+      final String path = iip.getPath();
+      if (NameNodeLayoutVersion.supports(Feature.APPEND_NEW_BLOCK,
+          fsn.getEffectiveLayoutVersion())) {
+        fsn.getEditLog().logAppendFile(path, file, newBlock, logRetryCache);
+      } else {
+        fsn.getEditLog().logOpenFile(path, file, false, logRetryCache);
+      }
+    }
+    return ret;
+  }
+
+  /**
+   * Verify quota when using the preferred block size for UC block. This is
+   * usually used by append and truncate.
+   *
+   * @throws QuotaExceededException when violating the storage quota
+   * @return expected quota usage update. null means no change or no need to
+   *         update quota usage later
+   */
+  private static QuotaCounts verifyQuotaForUCBlock(FSNamesystem fsn,
+      INodeFile file, INodesInPath iip) throws QuotaExceededException {
+    FSDirectory fsd = fsn.getFSDirectory();
+    if (!fsn.isImageLoaded() || fsd.shouldSkipQuotaChecks()) {
+      // Do not check quota if editlog is still being processed
+      return null;
+    }
+    if (file.getLastBlock() != null) {
+      final QuotaCounts delta = computeQuotaDeltaForUCBlock(fsn, file);
+      fsd.readLock();
+      try {
+        FSDirectory.verifyQuota(iip, iip.length() - 1, delta, null);
+        return delta;
+      } finally {
+        fsd.readUnlock();
+      }
+    }
+    return null;
+  }
+
+  /** Compute quota change for converting a complete block to a UC block. */
+  private static QuotaCounts computeQuotaDeltaForUCBlock(FSNamesystem fsn,
+      INodeFile file) {
+    final QuotaCounts delta = new QuotaCounts.Builder().build();
+    final BlockInfo lastBlock = file.getLastBlock();
+    if (lastBlock != null) {
+      final long diff = file.getPreferredBlockSize() - lastBlock.getNumBytes();
+      final short repl = file.getPreferredBlockReplication();
+      delta.addStorageSpace(diff * repl);
+      final BlockStoragePolicy policy = fsn.getFSDirectory()
+          .getBlockStoragePolicySuite().getPolicy(file.getStoragePolicyID());
+      List<StorageType> types = policy.chooseStorageTypes(repl);
+      for (StorageType t : types) {
+        if (t.supportTypeQuota()) {
+          delta.addTypeSpace(t, diff);
+        }
+      }
+    }
+    return delta;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31f11713/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
index 201dabc..14f4d66 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
@@ -508,7 +508,7 @@ class FSDirStatAndListingOp {
       final long fileSize = !inSnapshot && isUc ?
           fileNode.computeFileSizeNotIncludingLastUcBlock() : size;
 
-      loc = fsd.getFSNamesystem().getBlockManager().createLocatedBlocks(
+      loc = fsd.getBlockManager().createLocatedBlocks(
           fileNode.getBlocks(snapshot), fileSize, isUc, 0L, size, false,
           inSnapshot, feInfo);
       if (loc == null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31f11713/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
index 9fc9def..e24bb2f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstructionContiguous;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
@@ -79,11 +80,11 @@ final class FSDirTruncateOp {
     try {
       src = fsd.resolvePath(pc, srcArg, pathComponents);
       iip = fsd.getINodesInPath4Write(src, true);
-      if (fsn.isPermissionEnabled()) {
+      if (fsd.isPermissionEnabled()) {
         fsd.checkPathAccess(pc, iip, FsAction.WRITE);
       }
       INodeFile file = INodeFile.valueOf(iip.getLastINode(), src);
-      final BlockStoragePolicy lpPolicy = fsn.getBlockManager()
+      final BlockStoragePolicy lpPolicy = fsd.getBlockManager()
           .getStoragePolicy("LAZY_PERSIST");
 
       if (lpPolicy != null && lpPolicy.getId() == file.getStoragePolicyID()) {
@@ -178,7 +179,7 @@ final class FSDirTruncateOp {
           "Should be the same block.";
       if (oldBlock.getBlockId() != tBlk.getBlockId()
           && !file.isBlockInLatestSnapshot(oldBlock)) {
-        fsn.getBlockManager().removeBlockFromMap(oldBlock);
+        fsd.getBlockManager().removeBlockFromMap(oldBlock);
       }
     }
     assert onBlockBoundary == (truncateBlock == null) :
@@ -223,6 +224,7 @@ final class FSDirTruncateOp {
     }
 
     BlockInfoUnderConstruction truncatedBlockUC;
+    BlockManager blockManager = fsn.getFSDirectory().getBlockManager();
     if (shouldCopyOnTruncate) {
       // Add new truncateBlock into blocksMap and
       // use oldBlock as a source for copy-on-truncate recovery
@@ -230,9 +232,8 @@ final class FSDirTruncateOp {
           file.getPreferredBlockReplication());
       truncatedBlockUC.setNumBytes(oldBlock.getNumBytes() - lastBlockDelta);
       truncatedBlockUC.setTruncateBlock(oldBlock);
-      file.setLastBlock(truncatedBlockUC,
-          fsn.getBlockManager().getStorages(oldBlock));
-      fsn.getBlockManager().addBlockCollection(truncatedBlockUC, file);
+      file.setLastBlock(truncatedBlockUC, blockManager.getStorages(oldBlock));
+      blockManager.addBlockCollection(truncatedBlockUC, file);
 
       NameNode.stateChangeLog.debug(
           "BLOCK* prepareFileForTruncate: Scheduling copy-on-truncate to new"
@@ -241,8 +242,7 @@ final class FSDirTruncateOp {
           truncatedBlockUC.getTruncateBlock());
     } else {
       // Use new generation stamp for in-place truncate recovery
-      fsn.getBlockManager().convertLastBlockToUnderConstruction(file,
-          lastBlockDelta);
+      blockManager.convertLastBlockToUnderConstruction(file, lastBlockDelta);
       oldBlock = file.getLastBlock();
       assert !oldBlock.isComplete() : "oldBlock should be under construction";
       truncatedBlockUC = (BlockInfoUnderConstruction) oldBlock;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31f11713/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
index 4830d5d..008a945 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
@@ -206,8 +206,8 @@ class FSDirWriteFileOp {
       DatanodeStorageInfo[] locs, long offset) throws IOException {
     LocatedBlock lBlk = BlockManager.newLocatedBlock(fsn.getExtendedBlock(blk),
                                                      locs, offset, false);
-    fsn.getBlockManager().setBlockToken(lBlk,
-                                        BlockTokenIdentifier.AccessMode.WRITE);
+    fsn.getFSDirectory().getBlockManager()
+        .setBlockToken(lBlk, BlockTokenIdentifier.AccessMode.WRITE);
     return lBlk;
   }
 
@@ -426,7 +426,7 @@ class FSDirWriteFileOp {
       fsd.setFileEncryptionInfo(src, feInfo);
       newNode = fsd.getInode(newNode.getId()).asFile();
     }
-    setNewINodeStoragePolicy(fsn.getBlockManager(), newNode, iip,
+    setNewINodeStoragePolicy(fsd.getBlockManager(), newNode, iip,
                              isLazyPersist);
     fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
     if (NameNode.stateChangeLog.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31f11713/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 63ef985..357684a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -392,7 +392,7 @@ public class FSEditLogLoader {
             FSNamesystem.LOG.debug("Reopening an already-closed file " +
                 "for append");
           }
-          LocatedBlock lb = fsNamesys.prepareFileForAppend(path, iip,
+          LocatedBlock lb = FSDirAppendOp.prepareFileForAppend(fsNamesys, iip,
               addCloseOp.clientName, addCloseOp.clientMachine, false, false,
               false);
           // add the op into retry cache if necessary
@@ -466,7 +466,7 @@ public class FSEditLogLoader {
       INodesInPath iip = fsDir.getINodesInPath4Write(path);
       INodeFile file = INodeFile.valueOf(iip.getLastINode(), path);
       if (!file.isUnderConstruction()) {
-        LocatedBlock lb = fsNamesys.prepareFileForAppend(path, iip,
+        LocatedBlock lb = FSDirAppendOp.prepareFileForAppend(fsNamesys, iip,
             appendOp.clientName, appendOp.clientMachine, appendOp.newBlock,
             false, false);
         // add the op into retry cache if necessary

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31f11713/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index fd37fbe..0b44431 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -142,7 +142,6 @@ import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
@@ -185,7 +184,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeException;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
@@ -250,7 +248,6 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
-import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RetriableException;
@@ -2174,175 +2171,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
 
   /**
-   * Append to an existing file for append.
-   * <p>
-   * 
-   * The method returns the last block of the file if this is a partial block,
-   * which can still be used for writing more data. The client uses the returned
-   * block locations to form the data pipeline for this block.<br>
-   * The method returns null if the last block is full. The client then
-   * allocates a new block with the next call using
-   * {@link ClientProtocol#addBlock}.
-   * <p>
-   * 
-   * For description of parameters and exceptions thrown see
-   * {@link ClientProtocol#append(String, String, EnumSetWritable)}
-   *
-   * @return the last block locations if the block is partial or null otherwise
-   */
-  private LocatedBlock appendFileInternal(FSPermissionChecker pc,
-      INodesInPath iip, String holder, String clientMachine, boolean newBlock,
-      boolean logRetryCache) throws IOException {
-    assert hasWriteLock();
-    // Verify that the destination does not exist as a directory already.
-    final INode inode = iip.getLastINode();
-    final String src = iip.getPath();
-    if (inode != null && inode.isDirectory()) {
-      throw new FileAlreadyExistsException("Cannot append to directory " + src
-          + "; already exists as a directory.");
-    }
-    if (isPermissionEnabled) {
-      dir.checkPathAccess(pc, iip, FsAction.WRITE);
-    }
-
-    try {
-      if (inode == null) {
-        throw new FileNotFoundException("failed to append to non-existent file "
-          + src + " for client " + clientMachine);
-      }
-      INodeFile myFile = INodeFile.valueOf(inode, src, true);
-      final BlockStoragePolicy lpPolicy =
-          blockManager.getStoragePolicy("LAZY_PERSIST");
-      if (lpPolicy != null &&
-          lpPolicy.getId() == myFile.getStoragePolicyID()) {
-        throw new UnsupportedOperationException(
-            "Cannot append to lazy persist file " + src);
-      }
-      // Opening an existing file for append - may need to recover lease.
-      recoverLeaseInternal(RecoverLeaseOp.APPEND_FILE, iip, src, holder,
-                           clientMachine, false);
-      
-      final BlockInfo lastBlock = myFile.getLastBlock();
-      // Check that the block has at least minimum replication.
-      if(lastBlock != null && lastBlock.isComplete() &&
-          !getBlockManager().isSufficientlyReplicated(lastBlock)) {
-        throw new IOException("append: lastBlock=" + lastBlock +
-            " of src=" + src + " is not sufficiently replicated yet.");
-      }
-      return prepareFileForAppend(src, iip, holder, clientMachine, newBlock,
-          true, logRetryCache);
-    } catch (IOException ie) {
-      NameNode.stateChangeLog.warn("DIR* NameSystem.append: " +ie.getMessage());
-      throw ie;
-    }
-  }
-  
-  /**
-   * Convert current node to under construction.
-   * Recreate in-memory lease record.
-   * 
-   * @param src path to the file
-   * @param leaseHolder identifier of the lease holder on this file
-   * @param clientMachine identifier of the client machine
-   * @param newBlock if the data is appended to a new block
-   * @param writeToEditLog whether to persist this change to the edit log
-   * @param logRetryCache whether to record RPC ids in editlog for retry cache
-   *                      rebuilding
-   * @return the last block locations if the block is partial or null otherwise
-   * @throws UnresolvedLinkException
-   * @throws IOException
-   */
-  LocatedBlock prepareFileForAppend(String src, INodesInPath iip,
-      String leaseHolder, String clientMachine, boolean newBlock,
-      boolean writeToEditLog, boolean logRetryCache) throws IOException {
-    final INodeFile file = iip.getLastINode().asFile();
-    final QuotaCounts delta = verifyQuotaForUCBlock(file, iip);
-
-    file.recordModification(iip.getLatestSnapshotId());
-    file.toUnderConstruction(leaseHolder, clientMachine);
-
-    leaseManager.addLease(
-        file.getFileUnderConstructionFeature().getClientName(), file.getId());
-
-    LocatedBlock ret = null;
-    if (!newBlock) {
-      ret = blockManager.convertLastBlockToUnderConstruction(file, 0);
-      if (ret != null && delta != null) {
-        Preconditions.checkState(delta.getStorageSpace() >= 0,
-            "appending to a block with size larger than the preferred block size");
-        dir.writeLock();
-        try {
-          dir.updateCountNoQuotaCheck(iip, iip.length() - 1, delta);
-        } finally {
-          dir.writeUnlock();
-        }
-      }
-    } else {
-      BlockInfo lastBlock = file.getLastBlock();
-      if (lastBlock != null) {
-        ExtendedBlock blk = new ExtendedBlock(this.getBlockPoolId(), lastBlock);
-        ret = new LocatedBlock(blk, new DatanodeInfo[0]);
-      }
-    }
-
-    if (writeToEditLog) {
-      if (NameNodeLayoutVersion.supports(Feature.APPEND_NEW_BLOCK,
-          getEffectiveLayoutVersion())) {
-        getEditLog().logAppendFile(src, file, newBlock, logRetryCache);
-      } else {
-        getEditLog().logOpenFile(src, file, false, logRetryCache);
-      }
-    }
-    return ret;
-  }
-
-  /**
-   * Verify quota when using the preferred block size for UC block. This is
-   * usually used by append and truncate
-   * @throws QuotaExceededException when violating the storage quota
-   * @return expected quota usage update. null means no change or no need to
-   *         update quota usage later
-   */
-  private QuotaCounts verifyQuotaForUCBlock(INodeFile file, INodesInPath iip)
-      throws QuotaExceededException {
-    if (!isImageLoaded() || dir.shouldSkipQuotaChecks()) {
-      // Do not check quota if editlog is still being processed
-      return null;
-    }
-    if (file.getLastBlock() != null) {
-      final QuotaCounts delta = computeQuotaDeltaForUCBlock(file);
-      dir.readLock();
-      try {
-        FSDirectory.verifyQuota(iip, iip.length() - 1, delta, null);
-        return delta;
-      } finally {
-        dir.readUnlock();
-      }
-    }
-    return null;
-  }
-
-  /** Compute quota change for converting a complete block to a UC block */
-  private QuotaCounts computeQuotaDeltaForUCBlock(INodeFile file) {
-    final QuotaCounts delta = new QuotaCounts.Builder().build();
-    final BlockInfo lastBlock = file.getLastBlock();
-    if (lastBlock != null) {
-      final long diff = file.getPreferredBlockSize() - lastBlock.getNumBytes();
-      final short repl = file.getPreferredBlockReplication();
-      delta.addStorageSpace(diff * repl);
-      final BlockStoragePolicy policy = dir.getBlockStoragePolicySuite()
-          .getPolicy(file.getStoragePolicyID());
-      List<StorageType> types = policy.chooseStorageTypes(repl);
-      for (StorageType t : types) {
-        if (t.supportTypeQuota()) {
-          delta.addTypeSpace(t, diff);
-        }
-      }
-    }
-    return delta;
-  }
-
-  /**
    * Recover lease;
    * Immediately revoke the lease of the current lease holder and start lease
    * recovery so that the file can be forced to be closed.
@@ -2487,62 +2315,45 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   /**
    * Append to an existing file in the namespace.
    */
-  LastBlockWithStatus appendFile(String src, String holder,
+  LastBlockWithStatus appendFile(String srcArg, String holder,
       String clientMachine, EnumSet<CreateFlag> flag, boolean logRetryCache)
       throws IOException {
     boolean newBlock = flag.contains(CreateFlag.NEW_BLOCK);
     if (newBlock) {
       requireEffectiveLayoutVersionForFeature(Feature.APPEND_NEW_BLOCK);
     }
-    try {
-      return appendFileInt(src, holder, clientMachine, newBlock, logRetryCache);
-    } catch (AccessControlException e) {
-      logAuditEvent(false, "append", src);
-      throw e;
-    }
-  }
 
-  private LastBlockWithStatus appendFileInt(final String srcArg, String holder,
-      String clientMachine, boolean newBlock, boolean logRetryCache)
-      throws IOException {
-    String src = srcArg;
     NameNode.stateChangeLog.debug(
         "DIR* NameSystem.appendFile: src={}, holder={}, clientMachine={}",
-        src, holder, clientMachine);
-    boolean skipSync = false;
-    LocatedBlock lb = null;
-    HdfsFileStatus stat = null;
-    FSPermissionChecker pc = getPermissionChecker();
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
-    writeLock();
+        srcArg, holder, clientMachine);
     try {
+      boolean skipSync = false;
+      LastBlockWithStatus lbs = null;
+      final FSPermissionChecker pc = getPermissionChecker();
       checkOperation(OperationCategory.WRITE);
-      checkNameNodeSafeMode("Cannot append to file" + src);
-      src = dir.resolvePath(pc, src, pathComponents);
-      final INodesInPath iip = dir.getINodesInPath4Write(src);
-      lb = appendFileInternal(pc, iip, holder, clientMachine, newBlock,
-          logRetryCache);
-      stat = FSDirStatAndListingOp.getFileInfo(dir, src, false,
-          FSDirectory.isReservedRawName(srcArg), true);
-    } catch (StandbyException se) {
-      skipSync = true;
-      throw se;
-    } finally {
-      writeUnlock();
-      // There might be transactions logged while trying to recover the lease.
-      // They need to be sync'ed even when an exception was thrown.
-      if (!skipSync) {
-        getEditLog().logSync();
+      writeLock();
+      try {
+        checkOperation(OperationCategory.WRITE);
+        checkNameNodeSafeMode("Cannot append to file" + srcArg);
+        lbs = FSDirAppendOp.appendFile(this, srcArg, pc, holder, clientMachine,
+            newBlock, logRetryCache);
+      } catch (StandbyException se) {
+        skipSync = true;
+        throw se;
+      } finally {
+        writeUnlock();
+        // There might be transactions logged while trying to recover the lease
+        // They need to be sync'ed even when an exception was thrown.
+        if (!skipSync) {
+          getEditLog().logSync();
+        }
       }
+      logAuditEvent(true, "append", srcArg);
+      return lbs;
+    } catch (AccessControlException e) {
+      logAuditEvent(false, "append", srcArg);
+      throw e;
     }
-    if (lb != null) {
-      NameNode.stateChangeLog.debug(
-          "DIR* NameSystem.appendFile: file {} for {} at {} block {} block" +
-          " size {}", src, holder, clientMachine, lb.getBlock(),
-          lb.getBlock().getNumBytes());
-    }
-    logAuditEvent(true, "append", srcArg);
-    return new LastBlockWithStatus(lb, stat);
   }
 
   ExtendedBlock getExtendedBlock(Block blk) {


Mime
View raw message