hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiten...@apache.org
Subject [7/7] hadoop git commit: HDFS-8421. Move startFile() and related functions into FSDirWriteFileOp. Contributed by Haohui Mai.
Date Thu, 21 May 2015 16:01:52 GMT
HDFS-8421. Move startFile() and related functions into FSDirWriteFileOp. Contributed by Haohui
Mai.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2b6bcfda
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2b6bcfda
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2b6bcfda

Branch: refs/heads/HDFS-7240
Commit: 2b6bcfdafa91223a4116e3e9304579f5f91dccac
Parents: 0305316
Author: Haohui Mai <wheat9@apache.org>
Authored: Thu May 21 08:05:10 2015 -0700
Committer: Haohui Mai <wheat9@apache.org>
Committed: Thu May 21 08:08:28 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hdfs/server/namenode/FSDirWriteFileOp.java  | 324 ++++++++++++++++++-
 .../hdfs/server/namenode/FSDirectory.java       |  91 ------
 .../hdfs/server/namenode/FSEditLogLoader.java   |  15 +-
 .../hdfs/server/namenode/FSNamesystem.java      | 280 +++-------------
 5 files changed, 371 insertions(+), 342 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b6bcfda/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 77d7369..9cfad7d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -577,6 +577,9 @@ Release 2.8.0 - UNRELEASED
 
     HDFS-4383. Document the lease limits. (Arshad Mohammad via aajisaka)
 
+    HDFS-8421. Move startFile() and related functions into FSDirWriteFileOp.
+    (wheat9)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b6bcfda/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
index 1ff0899..307bd59 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
@@ -18,11 +18,27 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import com.google.common.base.Preconditions;
+import org.apache.commons.io.Charsets;
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.fs.XAttr;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -34,15 +50,22 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.util.ChunkedArrayList;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
+import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID;
+import static org.apache.hadoop.util.Time.now;
+
 class FSDirWriteFileOp {
   private FSDirWriteFileOp() {}
   static boolean unprotectedRemoveBlock(
@@ -278,6 +301,210 @@ class FSDirWriteFileOp {
   }
 
   /**
+   * Create a new file or overwrite an existing file<br>
+   *
+   * Once the file is create the client then allocates a new block with the next
+   * call using {@link ClientProtocol#addBlock}.
+   * <p>
+   * For description of parameters and exceptions thrown see
+   * {@link ClientProtocol#create}
+   */
+  static HdfsFileStatus startFile(
+      FSNamesystem fsn, FSPermissionChecker pc, String src,
+      PermissionStatus permissions, String holder, String clientMachine,
+      EnumSet<CreateFlag> flag, boolean createParent,
+      short replication, long blockSize,
+      EncryptionKeyInfo ezInfo, INode.BlocksMapUpdateInfo toRemoveBlocks,
+      boolean logRetryEntry)
+      throws IOException {
+    assert fsn.hasWriteLock();
+
+    boolean create = flag.contains(CreateFlag.CREATE);
+    boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
+    boolean isLazyPersist = flag.contains(CreateFlag.LAZY_PERSIST);
+
+    CipherSuite suite = null;
+    CryptoProtocolVersion version = null;
+    KeyProviderCryptoExtension.EncryptedKeyVersion edek = null;
+
+    if (ezInfo != null) {
+      edek = ezInfo.edek;
+      suite = ezInfo.suite;
+      version = ezInfo.protocolVersion;
+    }
+
+    boolean isRawPath = FSDirectory.isReservedRawName(src);
+    FSDirectory fsd = fsn.getFSDirectory();
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    src = fsd.resolvePath(pc, src, pathComponents);
+    INodesInPath iip = fsd.getINodesInPath4Write(src);
+
+    // Verify that the destination does not exist as a directory already.
+    final INode inode = iip.getLastINode();
+    if (inode != null && inode.isDirectory()) {
+      throw new FileAlreadyExistsException(src +
+          " already exists as a directory");
+    }
+
+    final INodeFile myFile = INodeFile.valueOf(inode, src, true);
+    if (fsd.isPermissionEnabled()) {
+      if (overwrite && myFile != null) {
+        fsd.checkPathAccess(pc, iip, FsAction.WRITE);
+      }
+      /*
+       * To overwrite existing file, need to check 'w' permission
+       * of parent (equals to ancestor in this case)
+       */
+      fsd.checkAncestorAccess(pc, iip, FsAction.WRITE);
+    }
+
+    if (!createParent) {
+      fsd.verifyParentDir(iip, src);
+    }
+
+    if (myFile == null && !create) {
+      throw new FileNotFoundException("Can't overwrite non-existent " +
+          src + " for client " + clientMachine);
+    }
+
+    FileEncryptionInfo feInfo = null;
+
+    final EncryptionZone zone = fsd.getEZForPath(iip);
+    if (zone != null) {
+      // The path is now within an EZ, but we're missing encryption parameters
+      if (suite == null || edek == null) {
+        throw new RetryStartFileException();
+      }
+      // Path is within an EZ and we have provided encryption parameters.
+      // Make sure that the generated EDEK matches the settings of the EZ.
+      final String ezKeyName = zone.getKeyName();
+      if (!ezKeyName.equals(edek.getEncryptionKeyName())) {
+        throw new RetryStartFileException();
+      }
+      feInfo = new FileEncryptionInfo(suite, version,
+          edek.getEncryptedKeyVersion().getMaterial(),
+          edek.getEncryptedKeyIv(),
+          ezKeyName, edek.getEncryptionKeyVersionName());
+    }
+
+    if (myFile != null) {
+      if (overwrite) {
+        List<INode> toRemoveINodes = new ChunkedArrayList<>();
+        List<Long> toRemoveUCFiles = new ChunkedArrayList<>();
+        long ret = FSDirDeleteOp.delete(fsd, iip, toRemoveBlocks,
+                                        toRemoveINodes, toRemoveUCFiles, now());
+        if (ret >= 0) {
+          iip = INodesInPath.replace(iip, iip.length() - 1, null);
+          FSDirDeleteOp.incrDeletedFileCount(ret);
+          fsn.removeLeasesAndINodes(toRemoveUCFiles, toRemoveINodes, true);
+        }
+      } else {
+        // If lease soft limit time is expired, recover the lease
+        fsn.recoverLeaseInternal(FSNamesystem.RecoverLeaseOp.CREATE_FILE, iip,
+                                 src, holder, clientMachine, false);
+        throw new FileAlreadyExistsException(src + " for client " +
+            clientMachine + " already exists");
+      }
+    }
+    fsn.checkFsObjectLimit();
+    INodeFile newNode = null;
+    Map.Entry<INodesInPath, String> parent = FSDirMkdirOp
+        .createAncestorDirectories(fsd, iip, permissions);
+    if (parent != null) {
+      iip = addFile(fsd, parent.getKey(), parent.getValue(), permissions,
+                    replication, blockSize, holder, clientMachine);
+      newNode = iip != null ? iip.getLastINode().asFile() : null;
+    }
+    if (newNode == null) {
+      throw new IOException("Unable to add " + src +  " to namespace");
+    }
+    fsn.leaseManager.addLease(
+        newNode.getFileUnderConstructionFeature().getClientName(),
+        newNode.getId());
+    if (feInfo != null) {
+      fsd.setFileEncryptionInfo(src, feInfo);
+      newNode = fsd.getInode(newNode.getId()).asFile();
+    }
+    setNewINodeStoragePolicy(fsn.getBlockManager(), newNode, iip,
+                             isLazyPersist);
+    fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " +
+          src + " inode " + newNode.getId() + " " + holder);
+    }
+    return FSDirStatAndListingOp.getFileInfo(fsd, src, false, isRawPath, true);
+  }
+
+  static EncryptionKeyInfo getEncryptionKeyInfo(FSNamesystem fsn,
+      FSPermissionChecker pc, String src,
+      CryptoProtocolVersion[] supportedVersions)
+      throws IOException {
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    FSDirectory fsd = fsn.getFSDirectory();
+    src = fsd.resolvePath(pc, src, pathComponents);
+    INodesInPath iip = fsd.getINodesInPath4Write(src);
+    // Nothing to do if the path is not within an EZ
+    final EncryptionZone zone = fsd.getEZForPath(iip);
+    if (zone == null) {
+      return null;
+    }
+    CryptoProtocolVersion protocolVersion = fsn.chooseProtocolVersion(
+        zone, supportedVersions);
+    CipherSuite suite = zone.getSuite();
+    String ezKeyName = zone.getKeyName();
+
+    Preconditions.checkNotNull(protocolVersion);
+    Preconditions.checkNotNull(suite);
+    Preconditions.checkArgument(!suite.equals(CipherSuite.UNKNOWN),
+                                "Chose an UNKNOWN CipherSuite!");
+    Preconditions.checkNotNull(ezKeyName);
+    return new EncryptionKeyInfo(protocolVersion, suite, ezKeyName);
+  }
+
+  static INodeFile addFileForEditLog(
+      FSDirectory fsd, long id, INodesInPath existing, byte[] localName,
+      PermissionStatus permissions, List<AclEntry> aclEntries,
+      List<XAttr> xAttrs, short replication, long modificationTime, long atime,
+      long preferredBlockSize, boolean underConstruction, String clientName,
+      String clientMachine, byte storagePolicyId) {
+    final INodeFile newNode;
+    assert fsd.hasWriteLock();
+    if (underConstruction) {
+      newNode = newINodeFile(id, permissions, modificationTime,
+                                              modificationTime, replication,
+                                              preferredBlockSize,
+                                              storagePolicyId);
+      newNode.toUnderConstruction(clientName, clientMachine);
+    } else {
+      newNode = newINodeFile(id, permissions, modificationTime,
+                                              atime, replication,
+                                              preferredBlockSize,
+                                              storagePolicyId);
+    }
+
+    newNode.setLocalName(localName);
+    try {
+      INodesInPath iip = fsd.addINode(existing, newNode);
+      if (iip != null) {
+        if (aclEntries != null) {
+          AclStorage.updateINodeAcl(newNode, aclEntries, CURRENT_STATE_ID);
+        }
+        if (xAttrs != null) {
+          XAttrStorage.updateINodeXAttrs(newNode, xAttrs, CURRENT_STATE_ID);
+        }
+        return newNode;
+      }
+    } catch (IOException e) {
+      if(NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug(
+            "DIR* FSDirectory.unprotectedAddFile: exception when add "
+                + existing.getPath() + " to the file system", e);
+      }
+    }
+    return null;
+  }
+
+  /**
    * Add a block to the file. Returns a reference to the added block.
    */
   private static BlockInfoContiguous addBlock(
@@ -314,6 +541,41 @@ class FSDirWriteFileOp {
     }
   }
 
+  /**
+   * Add the given filename to the fs.
+   * @return the new INodesInPath instance that contains the new INode
+   */
+  private static INodesInPath addFile(
+      FSDirectory fsd, INodesInPath existing, String localName,
+      PermissionStatus permissions, short replication, long preferredBlockSize,
+      String clientName, String clientMachine)
+      throws IOException {
+
+    long modTime = now();
+    INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions,
+                                     modTime, modTime, replication, preferredBlockSize);
+    newNode.setLocalName(localName.getBytes(Charsets.UTF_8));
+    newNode.toUnderConstruction(clientName, clientMachine);
+
+    INodesInPath newiip;
+    fsd.writeLock();
+    try {
+      newiip = fsd.addINode(existing, newNode);
+    } finally {
+      fsd.writeUnlock();
+    }
+    if (newiip == null) {
+      NameNode.stateChangeLog.info("DIR* addFile: failed to add " +
+                                       existing.getPath() + "/" + localName);
+      return null;
+    }
+
+    if(NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* addFile: " + localName + " is added");
+    }
+    return newiip;
+  }
+
   private static FileState analyzeFileState(
       FSNamesystem fsn, String src, long fileId, String clientName,
       ExtendedBlock previous, LocatedBlock[] onRetryBlock)
@@ -345,8 +607,7 @@ class FSDirWriteFileOp {
         src = iip.getPath();
       }
     }
-    final INodeFile file = fsn.checkLease(src, clientName,
-                                                 inode, fileId);
+    final INodeFile file = fsn.checkLease(src, clientName, inode, fileId);
     BlockInfoContiguous lastBlockInFile = file.getLastBlock();
     if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
       // The block that the client claims is the current last block
@@ -497,6 +758,20 @@ class FSDirWriteFileOp {
     return true;
   }
 
+  private static INodeFile newINodeFile(
+      long id, PermissionStatus permissions, long mtime, long atime,
+      short replication, long preferredBlockSize, byte storagePolicyId) {
+    return new INodeFile(id, null, permissions, mtime, atime,
+        BlockInfoContiguous.EMPTY_ARRAY, replication, preferredBlockSize,
+        storagePolicyId);
+  }
+
+  private static INodeFile newINodeFile(long id, PermissionStatus permissions,
+      long mtime, long atime, short replication, long preferredBlockSize) {
+    return newINodeFile(id, permissions, mtime, atime, replication,
+        preferredBlockSize, (byte)0);
+  }
+
   /**
    * Persist the new block (the last block of the given file).
    */
@@ -533,6 +808,36 @@ class FSDirWriteFileOp {
     DatanodeStorageInfo.incrementBlocksScheduled(targets);
   }
 
+  private static void setNewINodeStoragePolicy(BlockManager bm, INodeFile
+      inode, INodesInPath iip, boolean isLazyPersist)
+      throws IOException {
+
+    if (isLazyPersist) {
+      BlockStoragePolicy lpPolicy =
+          bm.getStoragePolicy("LAZY_PERSIST");
+
+      // Set LAZY_PERSIST storage policy if the flag was passed to
+      // CreateFile.
+      if (lpPolicy == null) {
+        throw new HadoopIllegalArgumentException(
+            "The LAZY_PERSIST storage policy has been disabled " +
+            "by the administrator.");
+      }
+      inode.setStoragePolicyID(lpPolicy.getId(),
+                                 iip.getLatestSnapshotId());
+    } else {
+      BlockStoragePolicy effectivePolicy =
+          bm.getStoragePolicy(inode.getStoragePolicyID());
+
+      if (effectivePolicy != null &&
+          effectivePolicy.isCopyOnCreateFile()) {
+        // Copy effective policy from ancestor directory to current file.
+        inode.setStoragePolicyID(effectivePolicy.getId(),
+                                 iip.getLatestSnapshotId());
+      }
+    }
+  }
+
   private static class FileState {
     final INodeFile inode;
     final String path;
@@ -560,4 +865,19 @@ class FSDirWriteFileOp {
       this.clientMachine = clientMachine;
     }
   }
+
+  static class EncryptionKeyInfo {
+    final CryptoProtocolVersion protocolVersion;
+    final CipherSuite suite;
+    final String ezKeyName;
+    KeyProviderCryptoExtension.EncryptedKeyVersion edek;
+
+    EncryptionKeyInfo(
+        CryptoProtocolVersion protocolVersion, CipherSuite suite,
+        String ezKeyName) {
+      this.protocolVersion = protocolVersion;
+      this.suite = suite;
+      this.ezKeyName = ezKeyName;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b6bcfda/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index c2ed956..8fdd2d7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -21,13 +21,11 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.commons.io.Charsets;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.CipherSuite;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
@@ -42,7 +40,6 @@ import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.XAttrHelper;
-import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
@@ -86,7 +83,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KE
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_FILE_ENCRYPTION_INFO;
 import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID;
-import static org.apache.hadoop.util.Time.now;
 
 /**
  * Both FSDirectory and FSNamesystem manage the state of the namespace.
@@ -388,93 +384,6 @@ public class FSDirectory implements Closeable {
     skipQuotaCheck = true;
   }
 
-  private static INodeFile newINodeFile(long id, PermissionStatus permissions,
-      long mtime, long atime, short replication, long preferredBlockSize) {
-    return newINodeFile(id, permissions, mtime, atime, replication,
-        preferredBlockSize, (byte)0);
-  }
-
-  private static INodeFile newINodeFile(long id, PermissionStatus permissions,
-      long mtime, long atime, short replication, long preferredBlockSize,
-      byte storagePolicyId) {
-    return new INodeFile(id, null, permissions, mtime, atime,
-        BlockInfoContiguous.EMPTY_ARRAY, replication, preferredBlockSize,
-        storagePolicyId);
-  }
-
-  /**
-   * Add the given filename to the fs.
-   * @return the new INodesInPath instance that contains the new INode
-   */
-  INodesInPath addFile(INodesInPath existing, String localName, PermissionStatus
-      permissions, short replication, long preferredBlockSize,
-      String clientName, String clientMachine)
-    throws FileAlreadyExistsException, QuotaExceededException,
-      UnresolvedLinkException, SnapshotAccessControlException, AclException {
-
-    long modTime = now();
-    INodeFile newNode = newINodeFile(allocateNewInodeId(), permissions, modTime,
-        modTime, replication, preferredBlockSize);
-    newNode.setLocalName(localName.getBytes(Charsets.UTF_8));
-    newNode.toUnderConstruction(clientName, clientMachine);
-
-    INodesInPath newiip;
-    writeLock();
-    try {
-      newiip = addINode(existing, newNode);
-    } finally {
-      writeUnlock();
-    }
-    if (newiip == null) {
-      NameNode.stateChangeLog.info("DIR* addFile: failed to add " +
-          existing.getPath() + "/" + localName);
-      return null;
-    }
-
-    if(NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* addFile: " + localName + " is added");
-    }
-    return newiip;
-  }
-
-  INodeFile addFileForEditLog(long id, INodesInPath existing, byte[] localName,
-      PermissionStatus permissions, List<AclEntry> aclEntries,
-      List<XAttr> xAttrs, short replication, long modificationTime, long atime,
-      long preferredBlockSize, boolean underConstruction, String clientName,
-      String clientMachine, byte storagePolicyId) {
-    final INodeFile newNode;
-    assert hasWriteLock();
-    if (underConstruction) {
-      newNode = newINodeFile(id, permissions, modificationTime,
-          modificationTime, replication, preferredBlockSize, storagePolicyId);
-      newNode.toUnderConstruction(clientName, clientMachine);
-    } else {
-      newNode = newINodeFile(id, permissions, modificationTime, atime,
-          replication, preferredBlockSize, storagePolicyId);
-    }
-
-    newNode.setLocalName(localName);
-    try {
-      INodesInPath iip = addINode(existing, newNode);
-      if (iip != null) {
-        if (aclEntries != null) {
-          AclStorage.updateINodeAcl(newNode, aclEntries, CURRENT_STATE_ID);
-        }
-        if (xAttrs != null) {
-          XAttrStorage.updateINodeXAttrs(newNode, xAttrs, CURRENT_STATE_ID);
-        }
-        return newNode;
-      }
-    } catch (IOException e) {
-      if(NameNode.stateChangeLog.isDebugEnabled()) {
-        NameNode.stateChangeLog.debug(
-            "DIR* FSDirectory.unprotectedAddFile: exception when add "
-                + existing.getPath() + " to the file system", e);
-      }
-    }
-    return null;
-  }
-
   /**
    * This is a wrapper for resolvePath(). If the path passed
    * is prefixed with /.reserved/raw, then it checks to ensure that the caller

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b6bcfda/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index dec1298..476ff36 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -364,15 +364,12 @@ public class FSEditLogLoader {
 
         // add to the file tree
         inodeId = getAndUpdateLastInodeId(addCloseOp.inodeId, logVersion, lastInodeId);
-        newFile = fsDir.addFileForEditLog(inodeId, iip.getExistingINodes(),
-            iip.getLastLocalName(),
-            addCloseOp.permissions,
-            addCloseOp.aclEntries,
-            addCloseOp.xAttrs, replication,
-            addCloseOp.mtime, addCloseOp.atime,
-            addCloseOp.blockSize, true,
-            addCloseOp.clientName,
-            addCloseOp.clientMachine,
+        newFile = FSDirWriteFileOp.addFileForEditLog(fsDir, inodeId,
+            iip.getExistingINodes(), iip.getLastLocalName(),
+            addCloseOp.permissions, addCloseOp.aclEntries,
+            addCloseOp.xAttrs, replication, addCloseOp.mtime,
+            addCloseOp.atime, addCloseOp.blockSize, true,
+            addCloseOp.clientName, addCloseOp.clientMachine,
             addCloseOp.storagePolicyId);
         iip = INodesInPath.replace(iip, iip.length() - 1, newFile);
         fsNamesys.leaseManager.addLease(addCloseOp.clientName, newFile.getId());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b6bcfda/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 7e5b981..bfd6eba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -151,7 +151,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.XAttr;
@@ -275,7 +274,6 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
-import org.apache.hadoop.util.ChunkedArrayList;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -2279,8 +2277,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * @return chosen protocol version
    * @throws IOException
    */
-  private CryptoProtocolVersion chooseProtocolVersion(EncryptionZone zone,
-      CryptoProtocolVersion[] supportedVersions)
+  CryptoProtocolVersion chooseProtocolVersion(
+      EncryptionZone zone, CryptoProtocolVersion[] supportedVersions)
       throws UnknownCryptoProtocolVersionException, UnresolvedLinkException,
         SnapshotAccessControlException {
     Preconditions.checkNotNull(zone);
@@ -2342,11 +2340,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       String holder, String clientMachine, EnumSet<CreateFlag> flag,
       boolean createParent, short replication, long blockSize, 
       CryptoProtocolVersion[] supportedVersions, boolean logRetryCache)
-      throws AccessControlException, SafeModeException,
-      FileAlreadyExistsException, UnresolvedLinkException,
-      FileNotFoundException, ParentNotDirectoryException, IOException {
+      throws IOException {
 
-    HdfsFileStatus status = null;
+    HdfsFileStatus status;
     try {
       status = startFileInt(src, permissions, holder, clientMachine, flag,
           createParent, replication, blockSize, supportedVersions,
@@ -2355,54 +2351,42 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       logAuditEvent(false, "create", src);
       throw e;
     }
+    logAuditEvent(true, "create", src, null, status);
     return status;
   }
 
-  private HdfsFileStatus startFileInt(final String srcArg,
+  private HdfsFileStatus startFileInt(final String src,
       PermissionStatus permissions, String holder, String clientMachine,
       EnumSet<CreateFlag> flag, boolean createParent, short replication,
       long blockSize, CryptoProtocolVersion[] supportedVersions,
       boolean logRetryCache)
-      throws AccessControlException, SafeModeException,
-      FileAlreadyExistsException, UnresolvedLinkException,
-      FileNotFoundException, ParentNotDirectoryException, IOException {
-    String src = srcArg;
+      throws IOException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       StringBuilder builder = new StringBuilder();
-      builder.append("DIR* NameSystem.startFile: src=" + src
-              + ", holder=" + holder
-              + ", clientMachine=" + clientMachine
-              + ", createParent=" + createParent
-              + ", replication=" + replication
-              + ", createFlag=" + flag.toString()
-              + ", blockSize=" + blockSize);
-      builder.append(", supportedVersions=");
-      if (supportedVersions != null) {
-        builder.append(Arrays.toString(supportedVersions));
-      } else {
-        builder.append("null");
-      }
+      builder.append("DIR* NameSystem.startFile: src=").append(src)
+          .append(", holder=").append(holder)
+          .append(", clientMachine=").append(clientMachine)
+          .append(", createParent=").append(createParent)
+          .append(", replication=").append(replication)
+          .append(", createFlag=").append(flag.toString())
+          .append(", blockSize=").append(blockSize)
+          .append(", supportedVersions=")
+          .append(supportedVersions == null ? null : Arrays.toString
+              (supportedVersions));
       NameNode.stateChangeLog.debug(builder.toString());
     }
     if (!DFSUtil.isValidName(src)) {
       throw new InvalidPathException(src);
     }
     blockManager.verifyReplication(src, replication, clientMachine);
-
-    boolean skipSync = false;
-    HdfsFileStatus stat = null;
-    FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.WRITE);
     if (blockSize < minBlockSize) {
       throw new IOException("Specified block size is less than configured" +
           " minimum value (" + DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY
           + "): " + blockSize + " < " + minBlockSize);
     }
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
-    boolean create = flag.contains(CreateFlag.CREATE);
-    boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
-    boolean isLazyPersist = flag.contains(CreateFlag.LAZY_PERSIST);
 
+    FSPermissionChecker pc = getPermissionChecker();
     waitForLoadingFSImage();
 
     /**
@@ -2417,246 +2401,62 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
      * special RetryStartFileException to ask the DFSClient to try the create
      * again later.
      */
-    CryptoProtocolVersion protocolVersion = null;
-    CipherSuite suite = null;
-    String ezKeyName = null;
-    EncryptedKeyVersion edek = null;
+    FSDirWriteFileOp.EncryptionKeyInfo ezInfo = null;
 
     if (provider != null) {
       readLock();
       try {
-        src = dir.resolvePath(pc, src, pathComponents);
-        INodesInPath iip = dir.getINodesInPath4Write(src);
-        // Nothing to do if the path is not within an EZ
-        final EncryptionZone zone = dir.getEZForPath(iip);
-        if (zone != null) {
-          protocolVersion = chooseProtocolVersion(zone, supportedVersions);
-          suite = zone.getSuite();
-          ezKeyName = zone.getKeyName();
-
-          Preconditions.checkNotNull(protocolVersion);
-          Preconditions.checkNotNull(suite);
-          Preconditions.checkArgument(!suite.equals(CipherSuite.UNKNOWN),
-              "Chose an UNKNOWN CipherSuite!");
-          Preconditions.checkNotNull(ezKeyName);
-        }
+        checkOperation(OperationCategory.READ);
+        ezInfo = FSDirWriteFileOp
+            .getEncryptionKeyInfo(this, pc, src, supportedVersions);
       } finally {
         readUnlock();
       }
 
-      Preconditions.checkState(
-          (suite == null && ezKeyName == null) ||
-              (suite != null && ezKeyName != null),
-          "Both suite and ezKeyName should both be null or not null");
-
       // Generate EDEK if necessary while not holding the lock
-      edek = generateEncryptedDataEncryptionKey(ezKeyName);
+      if (ezInfo != null) {
+        ezInfo.edek = generateEncryptedDataEncryptionKey(ezInfo.ezKeyName);
+      }
       EncryptionFaultInjector.getInstance().startFileAfterGenerateKey();
     }
 
-    // Proceed with the create, using the computed cipher suite and 
+    boolean skipSync = false;
+    HdfsFileStatus stat = null;
+
+    // Proceed with the create, using the computed cipher suite and
     // generated EDEK
-    BlocksMapUpdateInfo toRemoveBlocks = null;
+    BlocksMapUpdateInfo toRemoveBlocks = new BlocksMapUpdateInfo();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot create file" + src);
       dir.writeLock();
       try {
-        src = dir.resolvePath(pc, src, pathComponents);
-        final INodesInPath iip = dir.getINodesInPath4Write(src);
-        toRemoveBlocks = startFileInternal(
-            pc, iip, permissions, holder,
-            clientMachine, create, overwrite,
-            createParent, replication, blockSize,
-            isLazyPersist, suite, protocolVersion, edek,
-            logRetryCache);
-        stat = FSDirStatAndListingOp.getFileInfo(
-            dir, src, false, FSDirectory.isReservedRawName(srcArg), true);
+        stat = FSDirWriteFileOp.startFile(this, pc, src, permissions, holder,
+                                          clientMachine, flag, createParent,
+                                          replication, blockSize, ezInfo,
+                                          toRemoveBlocks, logRetryCache);
       } finally {
         dir.writeUnlock();
       }
-    } catch (StandbyException se) {
-      skipSync = true;
-      throw se;
+    } catch (IOException e) {
+      skipSync = e instanceof StandbyException;
+      throw e;
     } finally {
       writeUnlock();
       // There might be transactions logged while trying to recover the lease.
       // They need to be sync'ed even when an exception was thrown.
       if (!skipSync) {
         getEditLog().logSync();
-        if (toRemoveBlocks != null) {
-          removeBlocks(toRemoveBlocks);
-          toRemoveBlocks.clear();
-        }
+        removeBlocks(toRemoveBlocks);
+        toRemoveBlocks.clear();
       }
     }
 
-    logAuditEvent(true, "create", srcArg, null, stat);
     return stat;
   }
 
   /**
-   * Create a new file or overwrite an existing file<br>
-   * 
-   * Once the file is create the client then allocates a new block with the next
-   * call using {@link ClientProtocol#addBlock}.
-   * <p>
-   * For description of parameters and exceptions thrown see
-   * {@link ClientProtocol#create}
-   */
-  private BlocksMapUpdateInfo startFileInternal(FSPermissionChecker pc, 
-      INodesInPath iip, PermissionStatus permissions, String holder,
-      String clientMachine, boolean create, boolean overwrite, 
-      boolean createParent, short replication, long blockSize, 
-      boolean isLazyPersist, CipherSuite suite, CryptoProtocolVersion version,
-      EncryptedKeyVersion edek, boolean logRetryEntry)
-      throws IOException {
-    assert hasWriteLock();
-    // Verify that the destination does not exist as a directory already.
-    final INode inode = iip.getLastINode();
-    final String src = iip.getPath();
-    if (inode != null && inode.isDirectory()) {
-      throw new FileAlreadyExistsException(src +
-          " already exists as a directory");
-    }
-
-    final INodeFile myFile = INodeFile.valueOf(inode, src, true);
-    if (isPermissionEnabled) {
-      if (overwrite && myFile != null) {
-        dir.checkPathAccess(pc, iip, FsAction.WRITE);
-      }
-      /*
-       * To overwrite existing file, need to check 'w' permission 
-       * of parent (equals to ancestor in this case)
-       */
-      dir.checkAncestorAccess(pc, iip, FsAction.WRITE);
-    }
-    if (!createParent) {
-      dir.verifyParentDir(iip, src);
-    }
-
-    FileEncryptionInfo feInfo = null;
-
-    final EncryptionZone zone = dir.getEZForPath(iip);
-    if (zone != null) {
-      // The path is now within an EZ, but we're missing encryption parameters
-      if (suite == null || edek == null) {
-        throw new RetryStartFileException();
-      }
-      // Path is within an EZ and we have provided encryption parameters.
-      // Make sure that the generated EDEK matches the settings of the EZ.
-      final String ezKeyName = zone.getKeyName();
-      if (!ezKeyName.equals(edek.getEncryptionKeyName())) {
-        throw new RetryStartFileException();
-      }
-      feInfo = new FileEncryptionInfo(suite, version,
-          edek.getEncryptedKeyVersion().getMaterial(),
-          edek.getEncryptedKeyIv(),
-          ezKeyName, edek.getEncryptionKeyVersionName());
-    }
-
-    try {
-      BlocksMapUpdateInfo toRemoveBlocks = null;
-      if (myFile == null) {
-        if (!create) {
-          throw new FileNotFoundException("Can't overwrite non-existent " +
-              src + " for client " + clientMachine);
-        }
-      } else {
-        if (overwrite) {
-          toRemoveBlocks = new BlocksMapUpdateInfo();
-          List<INode> toRemoveINodes = new ChunkedArrayList<>();
-          List<Long> toRemoveUCFiles = new ChunkedArrayList<>();
-          long ret = FSDirDeleteOp.delete(
-              dir, iip, toRemoveBlocks, toRemoveINodes,
-              toRemoveUCFiles, now());
-          if (ret >= 0) {
-            iip = INodesInPath.replace(iip, iip.length() - 1, null);
-            FSDirDeleteOp.incrDeletedFileCount(ret);
-            removeLeasesAndINodes(toRemoveUCFiles, toRemoveINodes, true);
-          }
-        } else {
-          // If lease soft limit time is expired, recover the lease
-          recoverLeaseInternal(RecoverLeaseOp.CREATE_FILE,
-              iip, src, holder, clientMachine, false);
-          throw new FileAlreadyExistsException(src + " for client " +
-              clientMachine + " already exists");
-        }
-      }
-
-      checkFsObjectLimit();
-      INodeFile newNode = null;
-
-      // Always do an implicit mkdirs for parent directory tree.
-      Map.Entry<INodesInPath, String> parent = FSDirMkdirOp
-          .createAncestorDirectories(dir, iip, permissions);
-      if (parent != null) {
-        iip = dir.addFile(parent.getKey(), parent.getValue(), permissions,
-            replication, blockSize, holder, clientMachine);
-        newNode = iip != null ? iip.getLastINode().asFile() : null;
-      }
-
-      if (newNode == null) {
-        throw new IOException("Unable to add " + src +  " to namespace");
-      }
-      leaseManager.addLease(newNode.getFileUnderConstructionFeature()
-          .getClientName(), newNode.getId());
-
-      // Set encryption attributes if necessary
-      if (feInfo != null) {
-        dir.setFileEncryptionInfo(src, feInfo);
-        newNode = dir.getInode(newNode.getId()).asFile();
-      }
-
-      setNewINodeStoragePolicy(newNode, iip, isLazyPersist);
-
-      // record file record in log, record new generation stamp
-      getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
-      if (NameNode.stateChangeLog.isDebugEnabled()) {
-        NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " +
-            src + " inode " + newNode.getId() + " " + holder);
-      }
-      return toRemoveBlocks;
-    } catch (IOException ie) {
-      NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: " + src + " " +
-                                       ie.getMessage());
-      throw ie;
-    }
-  }
-
-  private void setNewINodeStoragePolicy(INodeFile inode,
-                                        INodesInPath iip,
-                                        boolean isLazyPersist)
-      throws IOException {
-
-    if (isLazyPersist) {
-      BlockStoragePolicy lpPolicy =
-          blockManager.getStoragePolicy("LAZY_PERSIST");
-
-      // Set LAZY_PERSIST storage policy if the flag was passed to
-      // CreateFile.
-      if (lpPolicy == null) {
-        throw new HadoopIllegalArgumentException(
-            "The LAZY_PERSIST storage policy has been disabled " +
-            "by the administrator.");
-      }
-      inode.setStoragePolicyID(lpPolicy.getId(),
-                                 iip.getLatestSnapshotId());
-    } else {
-      BlockStoragePolicy effectivePolicy =
-          blockManager.getStoragePolicy(inode.getStoragePolicyID());
-
-      if (effectivePolicy != null &&
-          effectivePolicy.isCopyOnCreateFile()) {
-        // Copy effective policy from ancestor directory to current file.
-        inode.setStoragePolicyID(effectivePolicy.getId(),
-                                 iip.getLatestSnapshotId());
-      }
-    }
-  }
-
-  /**
    * Append to an existing file for append.
    * <p>
    * 
@@ -2871,7 +2671,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     return false;
   }
 
-  private enum RecoverLeaseOp {
+  enum RecoverLeaseOp {
     CREATE_FILE,
     APPEND_FILE,
     TRUNCATE_FILE,


Mime
View raw message