hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From whe...@apache.org
Subject hadoop git commit: HDFS-7436. Consolidate implementation of concat(). Contributed by Haohui Mai.
Date Mon, 24 Nov 2014 23:49:30 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk e37a4ff0c -> 8caf537af


HDFS-7436. Consolidate implementation of concat(). Contributed by Haohui Mai.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8caf537a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8caf537a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8caf537a

Branch: refs/heads/trunk
Commit: 8caf537afabc70b0c74e0a29aea0cc2935ecb162
Parents: e37a4ff
Author: Haohui Mai <wheat9@apache.org>
Authored: Mon Nov 24 15:42:38 2014 -0800
Committer: Haohui Mai <wheat9@apache.org>
Committed: Mon Nov 24 15:42:38 2014 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   2 +
 .../hdfs/server/namenode/FSDirConcatOp.java     | 233 +++++++++++++++++++
 .../hdfs/server/namenode/FSDirectory.java       |  99 ++------
 .../hdfs/server/namenode/FSEditLogLoader.java   |   2 +-
 .../hdfs/server/namenode/FSNamesystem.java      | 174 +-------------
 5 files changed, 268 insertions(+), 242 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8caf537a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 2ba21fc..e2116b0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -393,6 +393,8 @@ Release 2.7.0 - UNRELEASED
     HDFS-7419. Improve error messages for DataNode hot swap drive feature (Lei
     Xu via Colin P. Mccabe)
 
+    HDFS-7436. Consolidate implementation of concat(). (wheat9)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8caf537a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
new file mode 100644
index 0000000..12feb33
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.SnapshotException;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.hadoop.util.Time.now;
+
+class FSDirConcatOp {
+  static HdfsFileStatus concat(
+    FSDirectory fsd, String target, String[] srcs,
+    boolean logRetryCache) throws IOException {
+    Preconditions.checkArgument(!target.isEmpty(), "Target file name is empty");
+    Preconditions.checkArgument(srcs != null && srcs.length > 0,
+      "No sources given");
+    assert srcs != null;
+
+    FSDirectory.LOG.debug("concat {} to {}", Arrays.toString(srcs), target);
+    // We require all files be in the same directory
+    String trgParent =
+      target.substring(0, target.lastIndexOf(Path.SEPARATOR_CHAR));
+    for (String s : srcs) {
+      String srcParent = s.substring(0, s.lastIndexOf(Path.SEPARATOR_CHAR));
+      if (!srcParent.equals(trgParent)) {
+        throw new IllegalArgumentException(
+           "Sources and target are not in the same directory");
+      }
+    }
+
+    // write permission for the target
+    if (fsd.isPermissionEnabled()) {
+      FSPermissionChecker pc = fsd.getPermissionChecker();
+      fsd.checkPathAccess(pc, target, FsAction.WRITE);
+
+      // and srcs
+      for(String aSrc: srcs) {
+        fsd.checkPathAccess(pc, aSrc, FsAction.READ); // read the file
+        fsd.checkParentAccess(pc, aSrc, FsAction.WRITE); // for delete
+      }
+    }
+
+    // to make sure no two files are the same
+    Set<INode> si = new HashSet<INode>();
+
+    // we put the following prerequisite for the operation
+    // replication and blocks sizes should be the same for ALL the blocks
+
+    // check the target
+    final INodesInPath trgIip = fsd.getINodesInPath4Write(target);
+    if (fsd.getEZForPath(trgIip) != null) {
+      throw new HadoopIllegalArgumentException(
+          "concat can not be called for files in an encryption zone.");
+    }
+    final INodeFile trgInode = INodeFile.valueOf(trgIip.getLastINode(), target);
+    if(trgInode.isUnderConstruction()) {
+      throw new HadoopIllegalArgumentException("concat: target file "
+          + target + " is under construction");
+    }
+    // per design target shouldn't be empty and all the blocks same size
+    if(trgInode.numBlocks() == 0) {
+      throw new HadoopIllegalArgumentException("concat: target file "
+          + target + " is empty");
+    }
+    if (trgInode.isWithSnapshot()) {
+      throw new HadoopIllegalArgumentException("concat: target file "
+          + target + " is in a snapshot");
+    }
+
+    long blockSize = trgInode.getPreferredBlockSize();
+
+    // check the end block to be full
+    final BlockInfo last = trgInode.getLastBlock();
+    if(blockSize != last.getNumBytes()) {
+      throw new HadoopIllegalArgumentException("The last block in " + target
+          + " is not full; last block size = " + last.getNumBytes()
+          + " but file block size = " + blockSize);
+    }
+
+    si.add(trgInode);
+    final short repl = trgInode.getFileReplication();
+
+    // now check the srcs
+    boolean endSrc = false; // final src file doesn't have to have full end block
+    for(int i=0; i< srcs.length; i++) {
+      String src = srcs[i];
+      if(i== srcs.length-1)
+        endSrc=true;
+
+      final INodeFile srcInode = INodeFile.valueOf(fsd.getINode4Write(src), src);
+      if(src.isEmpty()
+          || srcInode.isUnderConstruction()
+          || srcInode.numBlocks() == 0) {
+        throw new HadoopIllegalArgumentException("concat: source file " + src
+            + " is invalid or empty or underConstruction");
+      }
+
+      // check replication and blocks size
+      if(repl != srcInode.getBlockReplication()) {
+        throw new HadoopIllegalArgumentException("concat: the source file "
+            + src + " and the target file " + target
+            + " should have the same replication: source replication is "
+            + srcInode.getBlockReplication()
+            + " but target replication is " + repl);
+      }
+
+      //boolean endBlock=false;
+      // verify that all the blocks are of the same length as target
+      // should be enough to check the end blocks
+      final BlockInfo[] srcBlocks = srcInode.getBlocks();
+      int idx = srcBlocks.length-1;
+      if(endSrc)
+        idx = srcBlocks.length-2; // end block of endSrc is OK not to be full
+      if(idx >= 0 && srcBlocks[idx].getNumBytes() != blockSize) {
+        throw new HadoopIllegalArgumentException("concat: the source file "
+            + src + " and the target file " + target
+            + " should have the same blocks sizes: target block size is "
+            + blockSize + " but the size of source block " + idx + " is "
+            + srcBlocks[idx].getNumBytes());
+      }
+
+      si.add(srcInode);
+    }
+
+    // make sure no two files are the same
+    if(si.size() < srcs.length+1) { // trg + srcs
+      // it means at least two files are the same
+      throw new HadoopIllegalArgumentException(
+          "concat: at least two of the source files are the same");
+    }
+
+    if(NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.concat: " +
+          Arrays.toString(srcs) + " to " + target);
+    }
+
+    long timestamp = now();
+    fsd.writeLock();
+    try {
+      unprotectedConcat(fsd, target, srcs, timestamp);
+    } finally {
+      fsd.writeUnlock();
+    }
+    fsd.getEditLog().logConcat(target, srcs, timestamp, logRetryCache);
+    return fsd.getAuditFileInfo(target, false);
+  }
+
+  /**
+   * Concat all the blocks from srcs to trg and delete the srcs files
+   * @param fsd FSDirectory
+   * @param target target file to move the blocks to
+   * @param srcs list of file to move the blocks from
+   */
+  static void unprotectedConcat(
+    FSDirectory fsd, String target, String[] srcs, long timestamp)
+    throws IOException {
+    assert fsd.hasWriteLock();
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* FSNamesystem.concat to "+target);
+    }
+    // do the move
+
+    final INodesInPath trgIIP = fsd.getINodesInPath4Write(target, true);
+    final INode[] trgINodes = trgIIP.getINodes();
+    final INodeFile trgInode = trgIIP.getLastINode().asFile();
+    INodeDirectory trgParent = trgINodes[trgINodes.length-2].asDirectory();
+    final int trgLatestSnapshot = trgIIP.getLatestSnapshotId();
+
+    final INodeFile [] allSrcInodes = new INodeFile[srcs.length];
+    for(int i = 0; i < srcs.length; i++) {
+      final INodesInPath iip = fsd.getINodesInPath4Write(srcs[i]);
+      final int latest = iip.getLatestSnapshotId();
+      final INode inode = iip.getLastINode();
+
+      // check if the file in the latest snapshot
+      if (inode.isInLatestSnapshot(latest)) {
+        throw new SnapshotException("Concat: the source file " + srcs[i]
+            + " is in snapshot " + latest);
+      }
+
+      // check if the file has other references.
+      if (inode.isReference() && ((INodeReference.WithCount)
+          inode.asReference().getReferredINode()).getReferenceCount() > 1) {
+        throw new SnapshotException("Concat: the source file " + srcs[i]
+            + " is referred by some other reference in some snapshot.");
+      }
+
+      allSrcInodes[i] = inode.asFile();
+    }
+    trgInode.concatBlocks(allSrcInodes);
+
+    // since we are in the same dir - we can use same parent to remove files
+    int count = 0;
+    for(INodeFile nodeToRemove: allSrcInodes) {
+      if(nodeToRemove == null) continue;
+
+      nodeToRemove.setBlocks(null);
+      trgParent.removeChild(nodeToRemove, trgLatestSnapshot);
+      fsd.getINodeMap().remove(nodeToRemove);
+      count++;
+    }
+
+    trgInode.setModificationTime(timestamp, trgLatestSnapshot);
+    trgParent.updateModificationTime(timestamp, trgLatestSnapshot);
+    // update quota on the parent directory ('count' files removed, 0 space)
+    FSDirectory.unprotectedUpdateCount(trgIIP, trgINodes.length - 1, -count, 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8caf537a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 84e5028..0a94849 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -98,6 +98,8 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Both FSDirectory and FSNamesystem manage the state of the namespace.
@@ -108,6 +110,7 @@ import org.apache.hadoop.security.UserGroupInformation;
  **/
 @InterfaceAudience.Private
 public class FSDirectory implements Closeable {
+  static final Logger LOG = LoggerFactory.getLogger(FSDirectory.class);
   private static INodeDirectory createRoot(FSNamesystem namesystem) {
     final INodeDirectory r = new INodeDirectory(
         INodeId.ROOT_INODE_ID,
@@ -157,6 +160,8 @@ public class FSDirectory implements Closeable {
   private final String fsOwnerShortUserName;
   private final String supergroup;
 
+  private final FSEditLog editLog;
+
   // utility methods to acquire and release read lock and write lock
   void readLock() {
     this.dirLock.readLock().lock();
@@ -250,7 +255,7 @@ public class FSDirectory implements Closeable {
         + " times");
     nameCache = new NameCache<ByteArray>(threshold);
     namesystem = ns;
-
+    this.editLog = ns.getEditLog();
     ezManager = new EncryptionZoneManager(this, conf);
   }
     
@@ -267,6 +272,14 @@ public class FSDirectory implements Closeable {
     return rootDir;
   }
 
+  boolean isPermissionEnabled() {
+    return isPermissionEnabled;
+  }
+
+  FSEditLog getEditLog() {
+    return editLog;
+  }
+
   /**
    * Shutdown the filestore
    */
@@ -1174,81 +1187,6 @@ public class FSDirectory implements Closeable {
   }
 
   /**
-   * Concat all the blocks from srcs to trg and delete the srcs files
-   */
-  void concat(String target, String[] srcs, long timestamp)
-      throws UnresolvedLinkException, QuotaExceededException,
-      SnapshotAccessControlException, SnapshotException {
-    writeLock();
-    try {
-      // actual move
-      unprotectedConcat(target, srcs, timestamp);
-    } finally {
-      writeUnlock();
-    }
-  }
-
-  /**
-   * Concat all the blocks from srcs to trg and delete the srcs files
-   * @param target target file to move the blocks to
-   * @param srcs list of file to move the blocks from
-   */
-  void unprotectedConcat(String target, String [] srcs, long timestamp) 
-      throws UnresolvedLinkException, QuotaExceededException,
-      SnapshotAccessControlException, SnapshotException {
-    assert hasWriteLock();
-    if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* FSNamesystem.concat to "+target);
-    }
-    // do the move
-    
-    final INodesInPath trgIIP = getINodesInPath4Write(target, true);
-    final INode[] trgINodes = trgIIP.getINodes();
-    final INodeFile trgInode = trgIIP.getLastINode().asFile();
-    INodeDirectory trgParent = trgINodes[trgINodes.length-2].asDirectory();
-    final int trgLatestSnapshot = trgIIP.getLatestSnapshotId();
-    
-    final INodeFile [] allSrcInodes = new INodeFile[srcs.length];
-    for(int i = 0; i < srcs.length; i++) {
-      final INodesInPath iip = getINodesInPath4Write(srcs[i]);
-      final int latest = iip.getLatestSnapshotId();
-      final INode inode = iip.getLastINode();
-
-      // check if the file in the latest snapshot
-      if (inode.isInLatestSnapshot(latest)) {
-        throw new SnapshotException("Concat: the source file " + srcs[i]
-            + " is in snapshot " + latest);
-      }
-
-      // check if the file has other references.
-      if (inode.isReference() && ((INodeReference.WithCount)
-          inode.asReference().getReferredINode()).getReferenceCount() > 1) {
-        throw new SnapshotException("Concat: the source file " + srcs[i]
-            + " is referred by some other reference in some snapshot.");
-      }
-
-      allSrcInodes[i] = inode.asFile();
-    }
-    trgInode.concatBlocks(allSrcInodes);
-    
-    // since we are in the same dir - we can use same parent to remove files
-    int count = 0;
-    for(INodeFile nodeToRemove: allSrcInodes) {
-      if(nodeToRemove == null) continue;
-      
-      nodeToRemove.setBlocks(null);
-      trgParent.removeChild(nodeToRemove, trgLatestSnapshot);
-      inodeMap.remove(nodeToRemove);
-      count++;
-    }
-    
-    trgInode.setModificationTime(timestamp, trgLatestSnapshot);
-    trgParent.updateModificationTime(timestamp, trgLatestSnapshot);
-    // update quota on the parent directory ('count' files removed, 0 space)
-    unprotectedUpdateCount(trgIIP, trgINodes.length-1, -count, 0);
-  }
-
-  /**
    * Delete the target directory and collect the blocks under it
    * 
    * @param src Path of a directory to delete
@@ -1790,8 +1728,7 @@ public class FSDirectory implements Closeable {
    * updates quota without verification
    * callers responsibility is to make sure quota is not exceeded
    */
-  private static void unprotectedUpdateCount(INodesInPath inodesInPath,
-      int numOfINodes, long nsDelta, long dsDelta) {
+  static void unprotectedUpdateCount(INodesInPath inodesInPath, int numOfINodes, long nsDelta,
long dsDelta) {
     final INode[] inodes = inodesInPath.getINodes();
     for(int i=0; i < numOfINodes; i++) {
       if (inodes[i].isQuotaSet()) { // a directory with quota
@@ -3413,4 +3350,10 @@ public class FSDirectory implements Closeable {
       }
     }
   }
+
+  HdfsFileStatus getAuditFileInfo(String path, boolean resolveSymlink)
+    throws IOException {
+    return (namesystem.isAuditEnabled() && namesystem.isExternalInvocation())
+      ? getFileInfo(path, resolveSymlink, false, false) : null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8caf537a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 716768e..95dfefc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -490,7 +490,7 @@ public class FSEditLogLoader {
         srcs[i] =
             renameReservedPathsOnUpgrade(concatDeleteOp.srcs[i], logVersion);
       }
-      fsDir.unprotectedConcat(trg, srcs, concatDeleteOp.timestamp);
+      FSDirConcatOp.unprotectedConcat(fsDir, trg, srcs, concatDeleteOp.timestamp);
       
       if (toAddRetryCache) {
         fsNamesys.addCacheEntry(concatDeleteOp.rpcClientId,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8caf537a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 8dfc219..4059905 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -258,8 +258,6 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RetriableException;
 import org.apache.hadoop.ipc.RetryCache;
-import org.apache.hadoop.ipc.RetryCache.CacheEntry;
-import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.metrics2.annotation.Metric;
@@ -342,8 +340,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
   private HdfsFileStatus getAuditFileInfo(String path, boolean resolveSymlink)
       throws IOException {
-    return (isAuditEnabled() && isExternalInvocation())
-        ? dir.getFileInfo(path, resolveSymlink, false, false) : null;
+    return dir.getAuditFileInfo(path, resolveSymlink);
   }
   
   private void logAuditEvent(boolean succeeded, String cmd, String src)
@@ -1944,175 +1941,26 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * @throws IOException on error
    */
   void concat(String target, String [] srcs, boolean logRetryCache)
-      throws IOException, UnresolvedLinkException {
-    if(FSNamesystem.LOG.isDebugEnabled()) {
-      FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) +
-          " to " + target);
-    }
-
-    try {
-      concatInt(target, srcs, logRetryCache);
-    } catch (AccessControlException e) {
-      logAuditEvent(false, "concat", Arrays.toString(srcs), target, null);
-      throw e;
-    }
-  }
-
-  private void concatInt(String target, String [] srcs, 
-      boolean logRetryCache) throws IOException, UnresolvedLinkException {
-    // verify args
-    if(target.isEmpty()) {
-      throw new IllegalArgumentException("Target file name is empty");
-    }
-    if(srcs == null || srcs.length == 0) {
-      throw new IllegalArgumentException("No sources given");
-    }
-    
-    // We require all files be in the same directory
-    String trgParent = 
-      target.substring(0, target.lastIndexOf(Path.SEPARATOR_CHAR));
-    for (String s : srcs) {
-      String srcParent = s.substring(0, s.lastIndexOf(Path.SEPARATOR_CHAR));
-      if (!srcParent.equals(trgParent)) {
-        throw new IllegalArgumentException(
-           "Sources and target are not in the same directory");
-      }
-    }
-
-    HdfsFileStatus resultingStat = null;
-    FSPermissionChecker pc = getPermissionChecker();
+      throws IOException {
     checkOperation(OperationCategory.WRITE);
     waitForLoadingFSImage();
+    HdfsFileStatus stat = null;
+    boolean success = false;
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot concat " + target);
-      concatInternal(pc, target, srcs, logRetryCache);
-      resultingStat = getAuditFileInfo(target, false);
+      stat = FSDirConcatOp.concat(dir, target, srcs, logRetryCache);
+      success = true;
     } finally {
       writeUnlock();
-    }
-    getEditLog().logSync();
-    logAuditEvent(true, "concat", Arrays.toString(srcs), target, resultingStat);
-  }
-
-  /** See {@link #concat(String, String[])} */
-  private void concatInternal(FSPermissionChecker pc, String target,
-      String[] srcs, boolean logRetryCache) throws IOException,
-      UnresolvedLinkException {
-    assert hasWriteLock();
-
-    // write permission for the target
-    if (isPermissionEnabled) {
-      checkPathAccess(pc, target, FsAction.WRITE);
-
-      // and srcs
-      for(String aSrc: srcs) {
-        checkPathAccess(pc, aSrc, FsAction.READ); // read the file
-        checkParentAccess(pc, aSrc, FsAction.WRITE); // for delete 
+      if (success) {
+        getEditLog().logSync();
       }
+      logAuditEvent(success, "concat", Arrays.toString(srcs), target, stat);
     }
-
-    // to make sure no two files are the same
-    Set<INode> si = new HashSet<INode>();
-
-    // we put the following prerequisite for the operation
-    // replication and blocks sizes should be the same for ALL the blocks
-
-    // check the target
-    final INodesInPath trgIip = dir.getINodesInPath4Write(target);
-    if (dir.getEZForPath(trgIip) != null) {
-      throw new HadoopIllegalArgumentException(
-          "concat can not be called for files in an encryption zone.");
-    }
-    final INodeFile trgInode = INodeFile.valueOf(trgIip.getLastINode(),
-        target);
-    if(trgInode.isUnderConstruction()) {
-      throw new HadoopIllegalArgumentException("concat: target file "
-          + target + " is under construction");
-    }
-    // per design target shouldn't be empty and all the blocks same size
-    if(trgInode.numBlocks() == 0) {
-      throw new HadoopIllegalArgumentException("concat: target file "
-          + target + " is empty");
-    }
-    if (trgInode.isWithSnapshot()) {
-      throw new HadoopIllegalArgumentException("concat: target file "
-          + target + " is in a snapshot");
-    }
-
-    long blockSize = trgInode.getPreferredBlockSize();
-
-    // check the end block to be full
-    final BlockInfo last = trgInode.getLastBlock();
-    if(blockSize != last.getNumBytes()) {
-      throw new HadoopIllegalArgumentException("The last block in " + target
-          + " is not full; last block size = " + last.getNumBytes()
-          + " but file block size = " + blockSize);
-    }
-
-    si.add(trgInode);
-    final short repl = trgInode.getFileReplication();
-
-    // now check the srcs
-    boolean endSrc = false; // final src file doesn't have to have full end block
-    for(int i=0; i<srcs.length; i++) {
-      String src = srcs[i];
-      if(i==srcs.length-1)
-        endSrc=true;
-
-      final INodeFile srcInode = INodeFile.valueOf(dir.getINode4Write(src), src);
-      if(src.isEmpty() 
-          || srcInode.isUnderConstruction()
-          || srcInode.numBlocks() == 0) {
-        throw new HadoopIllegalArgumentException("concat: source file " + src
-            + " is invalid or empty or underConstruction");
-      }
-
-      // check replication and blocks size
-      if(repl != srcInode.getBlockReplication()) {
-        throw new HadoopIllegalArgumentException("concat: the source file "
-            + src + " and the target file " + target
-            + " should have the same replication: source replication is "
-            + srcInode.getBlockReplication()
-            + " but target replication is " + repl);
-      }
-
-      //boolean endBlock=false;
-      // verify that all the blocks are of the same length as target
-      // should be enough to check the end blocks
-      final BlockInfo[] srcBlocks = srcInode.getBlocks();
-      int idx = srcBlocks.length-1;
-      if(endSrc)
-        idx = srcBlocks.length-2; // end block of endSrc is OK not to be full
-      if(idx >= 0 && srcBlocks[idx].getNumBytes() != blockSize) {
-        throw new HadoopIllegalArgumentException("concat: the source file "
-            + src + " and the target file " + target
-            + " should have the same blocks sizes: target block size is "
-            + blockSize + " but the size of source block " + idx + " is "
-            + srcBlocks[idx].getNumBytes());
-      }
-
-      si.add(srcInode);
-    }
-
-    // make sure no two files are the same
-    if(si.size() < srcs.length+1) { // trg + srcs
-      // it means at least two files are the same
-      throw new HadoopIllegalArgumentException(
-          "concat: at least two of the source files are the same");
-    }
-
-    if(NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* NameSystem.concat: " + 
-          Arrays.toString(srcs) + " to " + target);
-    }
-
-    long timestamp = now();
-    dir.concat(target, srcs, timestamp);
-    getEditLog().logConcat(target, srcs, timestamp, logRetryCache);
   }
-  
+
   /**
    * stores the modification and access time for this inode. 
    * The access time is precise up to an hour. The transaction, if needed, is
@@ -7240,7 +7088,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * Client invoked methods are invoked over RPC and will be in 
    * RPC call context even if the client exits.
    */
-  private boolean isExternalInvocation() {
+  boolean isExternalInvocation() {
     return Server.isRpcInvocation() || NamenodeWebHdfsMethods.isWebHdfsInvocation();
   }
 


Mime
View raw message