hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r885143 [12/18] - in /hadoop/hdfs/branches/HDFS-326: ./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/ src/ant/org/apache/hadoop/ant/ src/ant/org/apache/hadoop/ant/condition/ src/c++/ src/c++/libhdfs/ src/c++/libhdfs/docs...
Date Sat, 28 Nov 2009 20:06:08 GMT
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Sat Nov 28 20:05:56 2009
@@ -17,22 +17,35 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import java.io.*;
+import java.io.Closeable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.net.URI;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.MetricsUtil;
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
 
 /*************************************************
  * FSDirectory stores the filesystem directory state.
@@ -54,7 +67,8 @@
   /** Access an existing dfs name directory. */
   FSDirectory(FSNamesystem ns, Configuration conf) {
     this(new FSImage(), ns, conf);
-    if(conf.getBoolean("dfs.name.dir.restore", false)) {
+    if(conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY, 
+                       DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_DEFAULT)) {
       NameNode.LOG.info("set FSImage.restoreFailedStorage");
       fsImage.setRestoreFailedStorage(true);
     }
@@ -82,7 +96,7 @@
   private void initialize(Configuration conf) {
     MetricsContext metricsContext = MetricsUtil.getContext("dfs");
     directoryMetrics = MetricsUtil.createRecord(metricsContext, "FSDirectory");
-    directoryMetrics.setTag("sessionId", conf.get("session.id"));
+    directoryMetrics.setTag("sessionId", conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY));
   }
 
   void loadFSImage(Collection<URI> dataDirs,
@@ -184,7 +198,7 @@
    */
   INode unprotectedAddFile( String path, 
                             PermissionStatus permissions,
-                            Block[] blocks, 
+                            BlockInfo[] blocks, 
                             short replication,
                             long modificationTime,
                             long atime,
@@ -254,7 +268,8 @@
         // Add file->block mapping
         INodeFile newF = (INodeFile)newNode;
         for (int i = 0; i < nrBlocks; i++) {
-          newF.setBlock(i, getBlockManager().addINode(blocks[i], newF));
+          BlockInfo blockInfo = new BlockInfo(blocks[i], newF.getReplication());
+          newF.setBlock(i, getBlockManager().addINode(blockInfo, newF));
         }
       }
     }
@@ -264,27 +279,39 @@
   /**
    * Add a block to the file. Returns a reference to the added block.
    */
-  Block addBlock(String path, INode[] inodes, Block block
-      ) throws QuotaExceededException  {
+  BlockInfo addBlock(String path,
+                     INode[] inodes,
+                     Block block,
+                     DatanodeDescriptor targets[]
+  ) throws QuotaExceededException, IOException  {
     waitForReady();
 
     synchronized (rootDir) {
-      INodeFile fileNode = (INodeFile) inodes[inodes.length-1];
+      assert inodes[inodes.length-1].isUnderConstruction() :
+        "INode should correspond to a file under construction";
+      INodeFileUnderConstruction fileINode = 
+        (INodeFileUnderConstruction)inodes[inodes.length-1];
 
       // check quota limits and updated space consumed
-      updateCount(inodes, inodes.length-1, 0, 
-                  fileNode.getPreferredBlockSize()*fileNode.getReplication());
-      
-      // associate the new list of blocks with this file
-      BlockInfo blockInfo = getBlockManager().addINode(block, fileNode);
-      fileNode.addBlock(blockInfo);
+      updateCount(inodes, inodes.length-1, 0,
+          fileINode.getPreferredBlockSize()*fileINode.getReplication(), true);
+
+      // associate new last block for the file
+      BlockInfoUnderConstruction blockInfo =
+        new BlockInfoUnderConstruction(
+            block,
+            fileINode.getReplication(),
+            BlockUCState.UNDER_CONSTRUCTION,
+            targets);
+      getBlockManager().addINode(blockInfo, fileINode);
+      fileINode.addBlock(blockInfo);
 
       NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
                                     + path + " with " + block
                                     + " block is added to the in-memory "
                                     + "file system");
+      return blockInfo;
     }
-    return block;
   }
 
   /**
@@ -328,7 +355,7 @@
 
     synchronized (rootDir) {
       // modify file-> block and blocksMap
-      fileNode.removeBlock(block);
+      fileNode.removeLastBlock(block);
       getBlockManager().removeBlockFromMap(block);
       // If block is removed from blocksMap remove it from corruptReplicasMap
       getBlockManager().removeFromCorruptReplicasMap(block);
@@ -344,7 +371,9 @@
 
   /**
    * @see #unprotectedRenameTo(String, String, long)
+   * @deprecated Use {@link #renameTo(String, String, Rename...)} instead.
    */
+  @Deprecated
   boolean renameTo(String src, String dst) throws QuotaExceededException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: "
@@ -358,24 +387,44 @@
     return true;
   }
 
-  /** Change a path name
+  /**
+   * @see #unprotectedRenameTo(String, String, long, Options.Rename...)
+   */
+  void renameTo(String src, String dst, Options.Rename... options)
+      throws IOException {
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: " + src
+          + " to " + dst);
+    }
+    waitForReady();
+    long now = FSNamesystem.now();
+    unprotectedRenameTo(src, dst, now, options);
+    fsImage.getEditLog().logRename(src, dst, now, options);
+  }
+
+  /**
+   * Change a path name
    * 
    * @param src source path
    * @param dst destination path
    * @return true if rename succeeds; false otherwise
    * @throws QuotaExceededException if the operation violates any quota limit
+   * @deprecated See {@link #renameTo(String, String)}
    */
-  boolean unprotectedRenameTo(String src, String dst, long timestamp) 
-  throws QuotaExceededException {
+  @Deprecated
+  boolean unprotectedRenameTo(String src, String dst, long timestamp)
+      throws QuotaExceededException {
     synchronized (rootDir) {
       INode[] srcInodes = rootDir.getExistingPathINodes(src);
 
       // check the validation of the source
       if (srcInodes[srcInodes.length-1] == null) {
         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-                                     +"failed to rename "+src+" to "+dst+ " because source does not exist");
+            + "failed to rename " + src + " to " + dst
+            + " because source does not exist");
         return false;
-      } else if (srcInodes.length == 1) {
+      } 
+      if (srcInodes.length == 1) {
         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
             +"failed to rename "+src+" to "+dst+ " because source is the root");
         return false;
@@ -384,71 +433,243 @@
         dst += Path.SEPARATOR + new Path(src).getName();
       }
       
-      // remove source
-      INode srcChild = null;
-      try {
-        srcChild = removeChild(srcInodes, srcInodes.length-1);
-      } catch (IOException e) {
-        // srcChild == null; go to next if statement
+      // check the validity of the destination
+      if (dst.equals(src)) {
+        return true;
       }
-      if (srcChild == null) {
+      // dst cannot be directory or a file under src
+      if (dst.startsWith(src) && 
+          dst.charAt(src.length()) == Path.SEPARATOR_CHAR) {
         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-            +"failed to rename "+src+" to "+dst+ " because the source can not be removed");
+            + "failed to rename " + src + " to " + dst
+            + " because destination starts with src");
         return false;
       }
-
-      String srcChildName = srcChild.getLocalName();
       
-      // check the validity of the destination
-      INode dstChild = null;
-      QuotaExceededException failureByQuota = null;
-
       byte[][] dstComponents = INode.getPathComponents(dst);
       INode[] dstInodes = new INode[dstComponents.length];
       rootDir.getExistingPathINodes(dstComponents, dstInodes);
-      if (dstInodes[dstInodes.length-1] != null) { //check if destination exists
+      if (dstInodes[dstInodes.length-1] != null) {
         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
                                      +"failed to rename "+src+" to "+dst+ 
                                      " because destination exists");
-      } else if (dstInodes[dstInodes.length-2] == null) { // check if its parent exists
+        return false;
+      }
+      if (dstInodes[dstInodes.length-2] == null) {
         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
             +"failed to rename "+src+" to "+dst+ 
-            " because destination's parent does not exists");
+            " because destination's parent does not exist");
+        return false;
       }
-      else {
-        // add to the destination
+      
+      // Ensure dst has quota to accommodate rename
+      verifyQuotaForRename(srcInodes,dstInodes);
+      
+      INode dstChild = null;
+      INode srcChild = null;
+      String srcChildName = null;
+      try {
+        // remove src
+        srcChild = removeChild(srcInodes, srcInodes.length-1);
+        if (srcChild == null) {
+          NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+              + "failed to rename " + src + " to " + dst
+              + " because the source can not be removed");
+          return false;
+        }
+        srcChildName = srcChild.getLocalName();
         srcChild.setLocalName(dstComponents[dstInodes.length-1]);
-        try {
-          // add it to the namespace
-          dstChild = addChild(dstInodes, dstInodes.length-1, srcChild, false);
-        } catch (QuotaExceededException qe) {
-          failureByQuota = qe;
+        
+        // add src to the destination
+        dstChild = addChildNoQuotaCheck(dstInodes, dstInodes.length - 1,
+            srcChild, -1, false);
+        if (dstChild != null) {
+          srcChild = null;
+          if (NameNode.stateChangeLog.isDebugEnabled()) {
+            NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: " 
+                + src + " is renamed to " + dst);
+          }
+          // update modification time of dst and the parent of src
+          srcInodes[srcInodes.length-2].setModificationTime(timestamp);
+          dstInodes[dstInodes.length-2].setModificationTime(timestamp);
+          return true;
+        }
+      } finally {
+        if (dstChild == null && srcChild != null) {
+          // put it back
+          srcChild.setLocalName(srcChildName);
+          addChildNoQuotaCheck(srcInodes, srcInodes.length - 1, srcChild, -1,
+              false);
         }
       }
-      if (dstChild != null) {
-        if (NameNode.stateChangeLog.isDebugEnabled()) {
-          NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: "
-            +src+" is renamed to "+dst);
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          +"failed to rename "+src+" to "+dst);
+      return false;
+    }
+  }
+
+  /**
+   * Rename src to dst.
+   * See {@link DistributedFileSystem#rename(Path, Path, Options.Rename...)}
+   * for details related to rename semantics.
+   * 
+   * @param src source path
+   * @param dst destination path
+   * @param timestamp modification time
+   * @param options Rename options
+   * @throws IOException if the operation violates any quota limit
+   */
+  void unprotectedRenameTo(String src, String dst, long timestamp,
+      Options.Rename... options) throws IOException {
+    boolean overwrite = false;
+    if (null != options) {
+      for (Rename option : options) {
+        if (option == Rename.OVERWRITE) {
+          overwrite = true;
         }
+      }
+    }
+    String error = null;
+    synchronized (rootDir) {
+      final INode[] srcInodes = rootDir.getExistingPathINodes(src);
+      final INode srcInode = srcInodes[srcInodes.length - 1];
+      // validate source
+      if (srcInode == null) {
+        error = "rename source " + src + " is not found.";
+        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+            + error);
+        throw new FileNotFoundException(error);
+      }
+      if (srcInodes.length == 1) {
+        error = "rename source cannot be the root";
+        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+            + error);
+        throw new IOException(error);
+      }
 
-        // update modification time of dst and the parent of src
-        srcInodes[srcInodes.length-2].setModificationTime(timestamp);
-        dstInodes[dstInodes.length-2].setModificationTime(timestamp);
-        return true;
-      } else {
+      // validate of the destination
+      if (dst.equals(src)) {
+        return;
+      }
+      // dst cannot be a directory or a file under src
+      if (dst.startsWith(src) && 
+          dst.charAt(src.length()) == Path.SEPARATOR_CHAR) {
+        error = "Rename destination " + dst
+            + " is a directory or file under source " + src;
         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-            +"failed to rename "+src+" to "+dst);
-        try {
-          // put it back
-          srcChild.setLocalName(srcChildName);
-          addChild(srcInodes, srcInodes.length-1, srcChild, false);
-        } catch (IOException ignored) {}
-        if (failureByQuota != null) {
-          throw failureByQuota;
-        } else {
-          return false;
+            + error);
+        throw new IOException(error);
+      }
+      final byte[][] dstComponents = INode.getPathComponents(dst);
+      final INode[] dstInodes = new INode[dstComponents.length];
+      rootDir.getExistingPathINodes(dstComponents, dstInodes);
+      INode dstInode = dstInodes[dstInodes.length - 1];
+      if (dstInodes.length == 1) {
+        error = "rename destination cannot be the root";
+        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+            + error);
+        throw new IOException(error);
+      }
+      if (dstInode != null) { // Destination exists
+        if (dstInode.isDirectory() != srcInode.isDirectory()) {
+          error = "Source " + src + " Destination " + dst
+              + " both should be either file or directory";
+          NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+              + error);
+          throw new IOException(error);
+        }
+        if (!overwrite) { // If destination exists, overwrite flag must be true
+          error = "rename destination " + dst + " already exists";
+          NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+              + error);
+          throw new FileAlreadyExistsException(error);
+        }
+        List<INode> children = dstInode.isDirectory() ? 
+            ((INodeDirectory) dstInode).getChildrenRaw() : null;
+        if (children != null && children.size() != 0) {
+          error = "rename cannot overwrite non empty destination directory "
+              + dst;
+          NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+              + error);
+          throw new IOException(error);
+        }
+      }
+      if (dstInodes[dstInodes.length - 2] == null) {
+        error = "rename destination parent " + dst + " not found.";
+        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+            + error);
+        throw new FileNotFoundException(error);
+      }
+      if (!dstInodes[dstInodes.length - 2].isDirectory()) {
+        error = "rename destination parent " + dst + " is a file.";
+        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+            + error);
+        throw new ParentNotDirectoryException(error);
+      }
+
+      // Ensure dst has quota to accommodate rename
+      verifyQuotaForRename(srcInodes, dstInodes);
+      INode removedSrc = removeChild(srcInodes, srcInodes.length - 1);
+      if (removedSrc == null) {
+        error = "Failed to rename " + src + " to " + dst
+            + " because the source can not be removed";
+        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+            + error);
+        throw new IOException(error);
+      }
+      final String srcChildName = removedSrc.getLocalName();
+      String dstChildName = null;
+      INode removedDst = null;
+      try {
+        if (dstInode != null) { // dst exists remove it
+          removedDst = removeChild(dstInodes, dstInodes.length - 1);
+          dstChildName = removedDst.getLocalName();
+        }
+
+        INode dstChild = null;
+        removedSrc.setLocalName(dstComponents[dstInodes.length - 1]);
+        // add src as dst to complete rename
+        dstChild = addChildNoQuotaCheck(dstInodes, dstInodes.length - 1,
+            removedSrc, -1, false);
+
+        if (dstChild != null) {
+          removedSrc = null;
+          if (NameNode.stateChangeLog.isDebugEnabled()) {
+            NameNode.stateChangeLog
+                .debug("DIR* FSDirectory.unprotectedRenameTo: " + src
+                    + " is renamed to " + dst);
+          }
+          srcInodes[srcInodes.length - 2].setModificationTime(timestamp);
+          dstInodes[dstInodes.length - 2].setModificationTime(timestamp);
+
+          // Collect the blocks and remove the lease for previous dst
+          if (removedDst != null) {
+            INode rmdst = removedDst;
+            removedDst = null;
+            List<Block> collectedBlocks = new ArrayList<Block>();
+            int filecount = rmdst.collectSubtreeBlocksAndClear(collectedBlocks);
+            incrDeletedFileCount(filecount);
+            getFSNamesystem().removePathAndBlocks(src, collectedBlocks);
+          }
+          return;
+        }
+      } finally {
+        if (removedSrc != null) {
+          // Rename failed - restore src
+          removedSrc.setLocalName(srcChildName);
+          addChildNoQuotaCheck(srcInodes, srcInodes.length - 1, removedSrc, -1,
+              false);
+        }
+        if (removedDst != null) {
+          // Rename failed - restore dst
+          removedDst.setLocalName(dstChildName);
+          addChildNoQuotaCheck(dstInodes, dstInodes.length - 1, removedDst, -1,
+              false);
         }
       }
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + "failed to rename " + src + " to " + dst);
+      throw new IOException("rename from " + src + " to " + dst + " failed.");
     }
   }
 
@@ -493,7 +714,7 @@
       // check disk quota
       long dsDelta = (replication - oldReplication[0]) *
            (fileNode.diskspaceConsumed()/oldReplication[0]);
-      updateCount(inodes, inodes.length-1, 0, dsDelta);
+      updateCount(inodes, inodes.length-1, 0, dsDelta, true);
 
       fileNode.setReplication(replication);
       fileBlocks = fileNode.getBlocks();
@@ -568,21 +789,92 @@
       }
     }
   }
+
+  /**
+   * 
+   * @param target
+   * @param srcs
+   * @throws IOException
+   */
+  public void concatInternal(String target, String [] srcs) throws IOException{
+    synchronized(rootDir) {
+      // actual move
+      waitForReady();
+
+      unprotectedConcat(target, srcs);
+      // do the commit
+      fsImage.getEditLog().logConcat(target, srcs, FSNamesystem.now());
+    }
+  }
+  
+
+  
+  /**
+   * Concat all the blocks from srcs to trg
+   * and delete the srcs files
+   * @param trg target file to move the blocks to
+   * @param srcs list of file to move the blocks from
+   * Must be public because also called from EditLogs
+   * NOTE: - it does not update quota (not needed for concat)
+   */
+  public void unprotectedConcat(String target, String [] srcs) throws IOException {
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* FSNamesystem.concat to "+target);
+    }
+    // do the move
+    
+    INode [] trgINodes =  getExistingPathINodes(target);
+    INodeFile trgInode = (INodeFile) trgINodes[trgINodes.length-1];
+    INodeDirectory trgParent = (INodeDirectory)trgINodes[trgINodes.length-2];
+    
+    INodeFile [] allSrcInodes = new INodeFile[srcs.length];
+    int i = 0;
+    int totalBlocks = 0;
+    for(String src : srcs) {
+      INodeFile srcInode = getFileINode(src);
+      allSrcInodes[i++] = srcInode;
+      totalBlocks += srcInode.blocks.length;  
+    }
+    trgInode.appendBlocks(allSrcInodes, totalBlocks); // copy the blocks
     
+    // since we are in the same dir - we can use same parent to remove files
+    int count = 0;
+    for(INodeFile nodeToRemove: allSrcInodes) {
+      if(nodeToRemove == null) continue;
+      
+      nodeToRemove.blocks = null;
+      trgParent.removeChild(nodeToRemove);
+      count++;
+    }
+    
+    long now = FSNamesystem.now();
+    trgInode.setModificationTime(now);
+    trgParent.setModificationTime(now);
+    // update quota on the parent directory ('count' files removed, 0 space)
+    unprotectedUpdateCount(trgINodes, trgINodes.length-1, - count, 0);
+  }
+
   /**
-   * Remove the file from management, return blocks
+   * Delete the target directory and collect the blocks under it
+   * 
+   * @param src Path of a directory to delete
+   * @param collectedBlocks Blocks under the deleted directory
+   * @return true on successful deletion; else false
    */
-  INode delete(String src) {
+  boolean delete(String src, List<Block>collectedBlocks) {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: "+src);
+      NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + src);
     }
     waitForReady();
     long now = FSNamesystem.now();
-    INode deletedNode = unprotectedDelete(src, now);
-    if (deletedNode != null) {
-      fsImage.getEditLog().logDelete(src, now);
+    INode removedNode = unprotectedDelete(src, collectedBlocks, now);
+    if (removedNode == null) {
+      return false;
     }
-    return deletedNode;
+    // Blocks will be deleted later by the caller of this method
+    getFSNamesystem().removePathAndBlocks(src, null);
+    fsImage.getEditLog().logDelete(src, now);
+    return true;
   }
   
   /** Return if a directory is empty or not **/
@@ -608,12 +900,30 @@
   /**
    * Delete a path from the name space
    * Update the count at each ancestor directory with quota
+   * <br>
+   * Note: This is to be used by {@link FSEditLog} only.
+   * <br>
+   * @param src a string representation of a path to an inode
+   * @param mtime the time the inode is removed
+   * @return deleted inode if deletion succeeds; else null
+   */ 
+  INode unprotectedDelete(String src, long mtime) {
+    List<Block> collectedBlocks = new ArrayList<Block>();
+    INode removedNode = unprotectedDelete(src, collectedBlocks, mtime);
+    getFSNamesystem().removePathAndBlocks(src, collectedBlocks);
+    return removedNode;
+  }
+  
+  /**
+   * Delete a path from the name space
+   * Update the count at each ancestor directory with quota
    * @param src a string representation of a path to an inode
-   * @param modificationTime the time the inode is removed
-   * @param deletedBlocks the place holder for the blocks to be removed
-   * @return if the deletion succeeds
+   * @param collectedBlocks blocks collected from the deleted path
+   * @param mtime the time the inode is removed
+   * @return deleted inode if deletion succeeds; else null
    */ 
-  INode unprotectedDelete(String src, long modificationTime) {
+  INode unprotectedDelete(String src, List<Block> collectedBlocks, 
+      long mtime) {
     src = normalizePath(src);
 
     synchronized (rootDir) {
@@ -624,33 +934,28 @@
         NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
             +"failed to remove "+src+" because it does not exist");
         return null;
-      } else if (inodes.length == 1) { // src is the root
+      }
+      if (inodes.length == 1) { // src is the root
         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: " +
             "failed to remove " + src +
             " because the root is not allowed to be deleted");
         return null;
-      } else {
-        try {
-          // Remove the node from the namespace
-          removeChild(inodes, inodes.length-1);
-          // set the parent's modification time
-          inodes[inodes.length-2].setModificationTime(modificationTime);
-          // GC all the blocks underneath the node.
-          ArrayList<Block> v = new ArrayList<Block>();
-          int filesRemoved = targetNode.collectSubtreeBlocksAndClear(v);
-          incrDeletedFileCount(filesRemoved);
-          getFSNamesystem().removePathAndBlocks(src, v);
-          if (NameNode.stateChangeLog.isDebugEnabled()) {
-            NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
-              +src+" is removed");
-          }
-          return targetNode;
-        } catch(QuotaExceededException e) {
-          NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: " +
-              "failed to remove " + src + " because " + e.getMessage());
-          return null;
-        }
       }
+      int pos = inodes.length - 1;
+      // Remove the node from the namespace
+      targetNode = removeChild(inodes, pos);
+      if (targetNode == null) {
+        return null;
+      }
+      // set the parent's modification time
+      inodes[pos-1].setModificationTime(mtime);
+      int filesRemoved = targetNode.collectSubtreeBlocksAndClear(collectedBlocks);
+      incrDeletedFileCount(filesRemoved);
+      if (NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
+          +src+" is removed");
+      }
+      return targetNode;
     }
   }
 
@@ -699,7 +1004,7 @@
       }
       
       int index = 0;
-      for (Block b : newnode.getBlocks()) {
+      for (BlockInfo b : newnode.getBlocks()) {
         BlockInfo info = getBlockManager().addINode(b, newnode);
         newnode.setBlock(index, info); // inode refers to the block in BlocksMap
         index++;
@@ -844,7 +1149,7 @@
         throw new FileNotFoundException(path + 
                                         " does not exist under rootDir.");
       }
-      updateCount(inodes, len-1, nsDelta, dsDelta);
+      updateCount(inodes, len-1, nsDelta, dsDelta, true);
     }
   }
   
@@ -854,10 +1159,11 @@
    * @param numOfINodes the number of inodes to update starting from index 0
    * @param nsDelta the delta change of namespace
    * @param dsDelta the delta change of diskspace
+   * @param checkQuota if true then check if quota is exceeded
    * @throws QuotaExceededException if the new count violates any quota limit
    */
   private void updateCount(INode[] inodes, int numOfINodes, 
-                           long nsDelta, long dsDelta)
+                           long nsDelta, long dsDelta, boolean checkQuota)
                            throws QuotaExceededException {
     if (!ready) {
       //still initializing. do not check or update quotas.
@@ -866,28 +1172,45 @@
     if (numOfINodes>inodes.length) {
       numOfINodes = inodes.length;
     }
-    // check existing components in the path  
-    int i=0;
-    try {
-      for(; i < numOfINodes; i++) {
-        if (inodes[i].isQuotaSet()) { // a directory with quota
-          INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i]; 
-          node.updateNumItemsInTree(nsDelta, dsDelta);
-        }
+    if (checkQuota) {
+      verifyQuota(inodes, numOfINodes, nsDelta, dsDelta, null);
+    }
+    for(int i = 0; i < numOfINodes; i++) {
+      if (inodes[i].isQuotaSet()) { // a directory with quota
+        INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i]; 
+        node.updateNumItemsInTree(nsDelta, dsDelta);
       }
+    }
+  }
+  
+  /** 
+   * update quota of each inode and check to see if quota is exceeded. 
+   * See {@link #updateCount(INode[], int, long, long, boolean)}
+   */ 
+  private void updateCountNoQuotaCheck(INode[] inodes, int numOfINodes, 
+                           long nsDelta, long dsDelta) {
+    try {
+      updateCount(inodes, numOfINodes, nsDelta, dsDelta, false);
     } catch (QuotaExceededException e) {
-      e.setPathName(getFullPathName(inodes, i));
-      // undo updates
-      for( ; i-- > 0; ) {
-        try {
-          if (inodes[i].isQuotaSet()) { // a directory with quota
-            INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i]; 
-            node.updateNumItemsInTree(-nsDelta, -dsDelta);
-          }
-        } catch (IOException ingored) {
-        }
+      NameNode.LOG.warn("FSDirectory.updateCountNoQuotaCheck - unexpected ", e);
+    }
+  }
+  
+  /**
+   * updates quota without verification
+   * callers responsibility is to make sure quota is not exceeded
+   * @param inodes
+   * @param numOfINodes
+   * @param nsDelta
+   * @param dsDelta
+   */
+   void unprotectedUpdateCount(INode[] inodes, int numOfINodes, 
+                                      long nsDelta, long dsDelta) {
+    for(int i=0; i < numOfINodes; i++) {
+      if (inodes[i].isQuotaSet()) { // a directory with quota
+        INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i]; 
+        node.unprotectedUpdateNumItemsInTree(nsDelta, dsDelta);
       }
-      throw e;
     }
   }
   
@@ -899,6 +1222,23 @@
     }
     return fullPathName.toString();
   }
+
+  /** Return the full path name of the specified inode */
+  static String getFullPathName(INode inode) {
+    // calculate the depth of this inode from root
+    int depth = 0;
+    for (INode i = inode; i != null; i = i.parent) {
+      depth++;
+    }
+    INode[] inodes = new INode[depth];
+
+    // fill up the inodes in the path from this inode to root
+    for (int i = 0; i < depth; i++) {
+      inodes[depth-i-1] = inode;
+      inode = inode.parent;
+    }
+    return getFullPathName(inodes, depth-1);
+  }
   
   /**
    * Create a directory 
@@ -917,7 +1257,7 @@
    */
   boolean mkdirs(String src, PermissionStatus permissions,
       boolean inheritPermission, long now)
-      throws FileNotFoundException, QuotaExceededException {
+      throws FileAlreadyExistsException, QuotaExceededException {
     src = normalizePath(src);
     String[] names = INode.getPathNames(src);
     byte[][] components = INode.getPathComponents(names);
@@ -932,7 +1272,7 @@
       for(; i < inodes.length && inodes[i] != null; i++) {
         pathbuilder.append(Path.SEPARATOR + names[i]);
         if (!inodes[i].isDirectory()) {
-          throw new FileNotFoundException("Parent path is not a directory: "
+          throw new FileAlreadyExistsException("Parent path is not a directory: "
               + pathbuilder);
         }
       }
@@ -981,7 +1321,7 @@
       long timestamp) throws QuotaExceededException {
     inodes[pos] = addChild(inodes, pos, 
         new INodeDirectory(name, permission, timestamp),
-        inheritPermission );
+        -1, inheritPermission );
   }
   
   /** Add a node child to the namespace. The full path name of the node is src.
@@ -999,48 +1339,142 @@
                       inheritPermission);
     }
   }
+
+  /**
+   * Verify quota for adding or moving a new INode with required 
+   * namespace and diskspace to a given position.
+   *  
+   * @param inodes INodes corresponding to a path
+   * @param pos position where a new INode will be added
+   * @param nsDelta needed namespace
+   * @param dsDelta needed diskspace
+   * @param commonAncestor Last node in inodes array that is a common ancestor
+   *          for a INode that is being moved from one location to the other.
+   *          Pass null if a node is not being moved.
+   * @throws QuotaExceededException if quota limit is exceeded.
+   */
+  private void verifyQuota(INode[] inodes, int pos, long nsDelta, long dsDelta,
+      INode commonAncestor) throws QuotaExceededException {
+    if (!ready) {
+      // Do not check quota if edits log is still being processed
+      return;
+    }
+    if (nsDelta <= 0 && dsDelta <= 0) {
+      // if quota is being freed or not being consumed
+      return;
+    }
+    if (pos>inodes.length) {
+      pos = inodes.length;
+    }
+    int i = pos - 1;
+    try {
+      // check existing components in the path  
+      for(; i >= 0; i--) {
+        if (commonAncestor == inodes[i]) {
+          // Moving an existing node. Stop checking for quota when common
+          // ancestor is reached
+          return;
+        }
+        if (inodes[i].isQuotaSet()) { // a directory with quota
+          INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i]; 
+          node.verifyQuota(nsDelta, dsDelta);
+        }
+      }
+    } catch (QuotaExceededException e) {
+      e.setPathName(getFullPathName(inodes, i));
+      throw e;
+    }
+  }
   
-  /** Add a node child to the inodes at index pos. 
-   * Its ancestors are stored at [0, pos-1]. 
-   * QuotaExceededException is thrown if it violates quota limit */
-  private <T extends INode> T addChild(INode[] pathComponents, int pos, T child,
-      boolean inheritPermission) throws QuotaExceededException {
-    return addChild(pathComponents, pos, child, -1, inheritPermission);
+  /**
+   * Verify quota for rename operation where srcInodes[srcInodes.length-1] moves
+   * dstInodes[dstInodes.length-1]
+   * 
+   * @param srcInodes directory from where node is being moved.
+   * @param dstInodes directory to where node is moved to.
+   * @throws QuotaExceededException if quota limit is exceeded.
+   */
+  private void verifyQuotaForRename(INode[] srcInodes, INode[]dstInodes)
+      throws QuotaExceededException {
+    if (!ready) {
+      // Do not check quota if edits log is still being processed
+      return;
+    }
+    INode srcInode = srcInodes[srcInodes.length - 1];
+    INode commonAncestor = null;
+    for(int i =0;srcInodes[i] == dstInodes[i]; i++) {
+      commonAncestor = srcInodes[i];
+    }
+    INode.DirCounts srcCounts = new INode.DirCounts();
+    srcInode.spaceConsumedInTree(srcCounts);
+    long nsDelta = srcCounts.getNsCount();
+    long dsDelta = srcCounts.getDsCount();
+    
+    // Reduce the required quota by dst that is being removed
+    INode dstInode = dstInodes[dstInodes.length - 1];
+    if (dstInode != null) {
+      INode.DirCounts dstCounts = new INode.DirCounts();
+      dstInode.spaceConsumedInTree(dstCounts);
+      nsDelta -= dstCounts.getNsCount();
+      dsDelta -= dstCounts.getDsCount();
+    }
+    verifyQuota(dstInodes, dstInodes.length - 1, nsDelta, dsDelta,
+        commonAncestor);
   }
   
   /** Add a node child to the inodes at index pos. 
    * Its ancestors are stored at [0, pos-1]. 
    * QuotaExceededException is thrown if it violates quota limit */
-  private <T extends INode> T addChild(INode[] pathComponents, int pos, T child,
-       long childDiskspace, boolean inheritPermission) throws QuotaExceededException {
+  private <T extends INode> T addChild(INode[] pathComponents, int pos,
+      T child, long childDiskspace, boolean inheritPermission,
+      boolean checkQuota) throws QuotaExceededException {
     INode.DirCounts counts = new INode.DirCounts();
     child.spaceConsumedInTree(counts);
     if (childDiskspace < 0) {
       childDiskspace = counts.getDsCount();
     }
-    updateCount(pathComponents, pos, counts.getNsCount(), childDiskspace);
+    updateCount(pathComponents, pos, counts.getNsCount(), childDiskspace,
+        checkQuota);
     T addedNode = ((INodeDirectory)pathComponents[pos-1]).addChild(
         child, inheritPermission);
     if (addedNode == null) {
-      updateCount(pathComponents, pos, 
-                  -counts.getNsCount(), -childDiskspace);
+      updateCount(pathComponents, pos, -counts.getNsCount(), 
+          -childDiskspace, true);
     }
     return addedNode;
   }
+
+  private <T extends INode> T addChild(INode[] pathComponents, int pos,
+      T child, long childDiskspace, boolean inheritPermission)
+      throws QuotaExceededException {
+    return addChild(pathComponents, pos, child, childDiskspace,
+        inheritPermission, true);
+  }
+  
+  private <T extends INode> T addChildNoQuotaCheck(INode[] pathComponents,
+      int pos, T child, long childDiskspace, boolean inheritPermission) {
+    T inode = null;
+    try {
+      inode = addChild(pathComponents, pos, child, childDiskspace,
+          inheritPermission, false);
+    } catch (QuotaExceededException e) {
+      NameNode.LOG.warn("FSDirectory.addChildNoQuotaCheck - unexpected", e); 
+    }
+    return inode;
+  }
   
   /** Remove an inode at index pos from the namespace.
    * Its ancestors are stored at [0, pos-1].
    * Count of each ancestor with quota is also updated.
    * Return the removed node; null if the removal fails.
    */
-  private INode removeChild(INode[] pathComponents, int pos)
-  throws QuotaExceededException {
+  private INode removeChild(INode[] pathComponents, int pos) {
     INode removedNode = 
       ((INodeDirectory)pathComponents[pos-1]).removeChild(pathComponents[pos]);
     if (removedNode != null) {
       INode.DirCounts counts = new INode.DirCounts();
       removedNode.spaceConsumedInTree(counts);
-      updateCount(pathComponents, pos,
+      updateCountNoQuotaCheck(pathComponents, pos,
                   -counts.getNsCount(), -counts.getDsCount());
     }
     return removedNode;

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Sat Nov 28 20:05:56 2009
@@ -24,14 +24,19 @@
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Iterator;
 
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DeprecatedUTF8;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
@@ -42,12 +47,11 @@
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.hdfs.DeprecatedUTF8;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
-import org.mortbay.log.Log;
 
 /**
  * FSEditLog maintains a log of the namespace modifications.
@@ -56,7 +60,7 @@
 public class FSEditLog {
   public  static final byte OP_INVALID = -1;
   private static final byte OP_ADD = 0;
-  private static final byte OP_RENAME = 1;  // rename
+  private static final byte OP_RENAME_OLD = 1;  // rename
   private static final byte OP_DELETE = 2;  // delete
   private static final byte OP_MKDIR = 3;   // create directory
   private static final byte OP_SET_REPLICATION = 4; // set replication
@@ -73,6 +77,9 @@
   private static final byte OP_CLEAR_NS_QUOTA = 12; // clear namespace quota
   private static final byte OP_TIMES = 13; // sets mod & access time on a file
   private static final byte OP_SET_QUOTA = 14; // sets name and disk quotas.
+  private static final byte OP_RENAME = 15;  // new rename
+  private static final byte OP_CONCAT_DELETE = 16; // concat files.
+
   /* 
    * The following operations are used to control remote edit log streams,
    * and not logged into file streams.
@@ -407,6 +414,7 @@
     return numEdits;
   }
 
+  @SuppressWarnings("deprecation")
   int loadEditRecords(int logVersion, DataInputStream in,
                              boolean closeOnExit) throws IOException {
     FSNamesystem fsNamesys = fsimage.getFSNamesystem();
@@ -416,9 +424,9 @@
     String clientMachine = null;
     String path = null;
     int numOpAdd = 0, numOpClose = 0, numOpDelete = 0,
-        numOpRename = 0, numOpSetRepl = 0, numOpMkDir = 0,
+        numOpRenameOld = 0, numOpSetRepl = 0, numOpMkDir = 0,
         numOpSetPerm = 0, numOpSetOwner = 0, numOpSetGenStamp = 0,
-        numOpTimes = 0, numOpOther = 0;
+        numOpTimes = 0, numOpRename = 0, numOpConcatDelete = 0, numOpOther = 0;
     try {
       while (true) {
         long timestamp = 0;
@@ -461,19 +469,9 @@
             blockSize = readLong(in);
           }
           // get blocks
-          Block blocks[] = null;
-          if (logVersion <= -14) {
-            blocks = readBlocks(in);
-          } else {
-            BlockTwo oldblk = new BlockTwo();
-            int num = in.readInt();
-            blocks = new Block[num];
-            for (int i = 0; i < num; i++) {
-              oldblk.readFields(in);
-              blocks[i] = new Block(oldblk.blkid, oldblk.len, 
-                                    Block.GRANDFATHER_GENERATION_STAMP);
-            }
-          }
+          boolean isFileUnderConstruction = (opcode == OP_ADD);
+          BlockInfo blocks[] = 
+            readBlocks(in, logVersion, isFileUnderConstruction, replication);
 
           // Older versions of HDFS does not store the block size in inode.
           // If the file has more than one block, use the size of the
@@ -521,7 +519,7 @@
                                                     path, permissions,
                                                     blocks, replication, 
                                                     mtime, atime, blockSize);
-          if (opcode == OP_ADD) {
+          if (isFileUnderConstruction) {
             numOpAdd++;
             //
             // Replace current node with a INodeUnderConstruction.
@@ -538,7 +536,7 @@
                                       clientMachine, 
                                       null);
             fsDir.replaceNode(path, node, cons);
-            fsNamesys.leaseManager.addLease(cons.clientName, path);
+            fsNamesys.leaseManager.addLease(cons.getClientName(), path);
           }
           break;
         } 
@@ -549,8 +547,29 @@
           fsDir.unprotectedSetReplication(path, replication, null);
           break;
         } 
-        case OP_RENAME: {
-          numOpRename++;
+        case OP_CONCAT_DELETE: {
+          if (logVersion > -22) {
+            throw new IOException("Unexpected opcode " + opcode
+                + " for version " + logVersion);
+          }
+          numOpConcatDelete++;
+          int length = in.readInt();
+          if (length < 3) { // trg, srcs.., timestam
+            throw new IOException("Incorrect data format. " 
+                                  + "Mkdir operation.");
+          }
+          String trg = FSImage.readString(in);
+          int srcSize = length - 1 - 1; //trg and timestamp
+          String [] srcs = new String [srcSize];
+          for(int i=0; i<srcSize;i++) {
+            srcs[i]= FSImage.readString(in);
+          }
+          timestamp = readLong(in);
+          fsDir.unprotectedConcat(trg, srcs);
+          break;
+        }
+        case OP_RENAME_OLD: {
+          numOpRenameOld++;
           int length = in.readInt();
           if (length != 3) {
             throw new IOException("Incorrect data format. " 
@@ -681,6 +700,26 @@
           fsDir.unprotectedSetTimes(path, mtime, atime, true);
           break;
         }
+        case OP_RENAME: {
+          if (logVersion > -21) {
+            throw new IOException("Unexpected opcode " + opcode
+                + " for version " + logVersion);
+          }
+          numOpRename++;
+          int length = in.readInt();
+          if (length != 3) {
+            throw new IOException("Incorrect data format. " 
+                                  + "Mkdir operation.");
+          }
+          String s = FSImage.readString(in);
+          String d = FSImage.readString(in);
+          timestamp = readLong(in);
+          Rename[] options = readRenameOptions(in);
+          FileStatus dinfo = fsDir.getFileInfo(d);
+          fsDir.unprotectedRenameTo(s, d, timestamp, options);
+          fsNamesys.changeLease(s, d, dinfo);
+          break;
+        }
         default: {
           throw new IOException("Never seen opcode " + opcode);
         }
@@ -692,12 +731,15 @@
     }
     if (FSImage.LOG.isDebugEnabled()) {
       FSImage.LOG.debug("numOpAdd = " + numOpAdd + " numOpClose = " + numOpClose 
-          + " numOpDelete = " + numOpDelete + " numOpRename = " + numOpRename 
+          + " numOpDelete = " + numOpDelete 
+          + " numOpRenameOld = " + numOpRenameOld 
           + " numOpSetRepl = " + numOpSetRepl + " numOpMkDir = " + numOpMkDir
           + " numOpSetPerm = " + numOpSetPerm 
           + " numOpSetOwner = " + numOpSetOwner
           + " numOpSetGenStamp = " + numOpSetGenStamp 
           + " numOpTimes = " + numOpTimes
+          + " numOpConcatDelete  = " + numOpConcatDelete
+          + " numOpRename = " + numOpRename
           + " numOpOther = " + numOpOther);
     }
     return numEdits;
@@ -737,7 +779,7 @@
     ArrayList<EditLogOutputStream> errorStreams = null;
     long start = FSNamesystem.now();
     for(EditLogOutputStream eStream : editStreams) {
-      Log.debug("loggin edits into " + eStream.getName()  + " stream");
+      FSImage.LOG.debug("loggin edits into " + eStream.getName()  + " stream");
       if(!eStream.isOperationSupported(op))
         continue;
       try {
@@ -942,7 +984,19 @@
       new DeprecatedUTF8(src),
       new DeprecatedUTF8(dst),
       FSEditLog.toLogLong(timestamp)};
-    logEdit(OP_RENAME, new ArrayWritable(DeprecatedUTF8.class, info));
+    logEdit(OP_RENAME_OLD, new ArrayWritable(DeprecatedUTF8.class, info));
+  }
+  
+  /** 
+   * Add rename record to edit log
+   */
+  void logRename(String src, String dst, long timestamp, Options.Rename... options) {
+    DeprecatedUTF8 info[] = new DeprecatedUTF8[] { 
+      new DeprecatedUTF8(src),
+      new DeprecatedUTF8(dst),
+      FSEditLog.toLogLong(timestamp)};
+    logEdit(OP_RENAME, new ArrayWritable(DeprecatedUTF8.class, info),
+        toBytesWritable(options));
   }
   
   /** 
@@ -975,7 +1029,22 @@
     DeprecatedUTF8 g = new DeprecatedUTF8(groupname == null? "": groupname);
     logEdit(OP_SET_OWNER, new DeprecatedUTF8(src), u, g);
   }
-
+  
+  /**
+   * concat(trg,src..) log
+   */
+  void logConcat(String trg, String [] srcs, long timestamp) {
+    int size = 1 + srcs.length + 1; // trg, srcs, timestamp
+    DeprecatedUTF8 info[] = new DeprecatedUTF8[size];
+    int idx = 0;
+    info[idx++] = new DeprecatedUTF8(trg);
+    for(int i=0; i<srcs.length; i++) {
+      info[idx++] = new DeprecatedUTF8(srcs[i]);
+    }
+    info[idx] = FSEditLog.toLogLong(timestamp);
+    logEdit(OP_CONCAT_DELETE, new ArrayWritable(DeprecatedUTF8.class, info));
+  }
+  
   /** 
    * Add delete file record to edit log
    */
@@ -1247,12 +1316,27 @@
     return Long.parseLong(FSImage.readString(in));
   }
 
-  static private Block[] readBlocks(DataInputStream in) throws IOException {
+  static private BlockInfo[] readBlocks(
+      DataInputStream in,
+      int logVersion,
+      boolean isFileUnderConstruction,
+      short replication) throws IOException {
     int numBlocks = in.readInt();
-    Block[] blocks = new Block[numBlocks];
+    BlockInfo[] blocks = new BlockInfo[numBlocks];
+    Block blk = new Block();
+    BlockTwo oldblk = new BlockTwo();
     for (int i = 0; i < numBlocks; i++) {
-      blocks[i] = new Block();
-      blocks[i].readFields(in);
+      if (logVersion <= -14) {
+        blk.readFields(in);
+      } else {
+        oldblk.readFields(in);
+        blk.set(oldblk.blkid, oldblk.len,
+                GenerationStamp.GRANDFATHER_GENERATION_STAMP);
+      }
+      if(isFileUnderConstruction && i == numBlocks-1)
+        blocks[i] = new BlockInfoUnderConstruction(blk, replication);
+      else
+        blocks[i] = new BlockInfo(blk, replication);
     }
     return blocks;
   }
@@ -1437,4 +1521,25 @@
     processIOError(errorStreams, true);
     return regAllowed;
   }
+  
+  static Rename[] readRenameOptions(DataInputStream in) throws IOException {
+    BytesWritable writable = new BytesWritable();
+    writable.readFields(in);
+    
+    byte[] bytes = writable.getBytes();
+    Rename[] options = new Rename[bytes.length];
+    
+    for (int i = 0; i < bytes.length; i++) {
+      options[i] = Rename.valueOf(bytes[i]);
+    }
+    return options;
+  }
+  
+  static BytesWritable toBytesWritable(Options.Rename... options) {
+    byte[] bytes = new byte[options.length];
+    for (int i = 0; i < options.length; i++) {
+      bytes[i] = options[i].value();
+    }
+    return new BytesWritable(bytes);
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Sat Nov 28 20:05:56 2009
@@ -51,10 +51,12 @@
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.common.UpgradeManager;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
@@ -63,6 +65,7 @@
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.io.Writable;
 
 /**
@@ -372,12 +375,12 @@
     if(startOpt == StartupOption.IMPORT 
         && (checkpointDirs == null || checkpointDirs.isEmpty()))
       throw new IOException("Cannot import image from a checkpoint. "
-                          + "\"fs.checkpoint.dir\" is not set." );
+                          + "\"dfs.namenode.checkpoint.dir\" is not set." );
 
     if(startOpt == StartupOption.IMPORT 
         && (checkpointEditsDirs == null || checkpointEditsDirs.isEmpty()))
       throw new IOException("Cannot import image from a checkpoint. "
-                          + "\"fs.checkpoint.edits.dir\" is not set." );
+                          + "\"dfs.namenode.checkpoint.dir\" is not set." );
     
     setStorageDirectories(dataDirs, editsDirs);
     // 1. For each data directory calculate its state and 
@@ -1075,7 +1078,7 @@
             blocks[j] = new Block();
             if (-14 < imgVersion) {
               blocks[j].set(in.readLong(), in.readLong(), 
-                            Block.GRANDFATHER_GENERATION_STAMP);
+                            GenerationStamp.GRANDFATHER_GENERATION_STAMP);
             } else {
               blocks[j].readFields(in);
             }
@@ -1403,7 +1406,7 @@
       }
       INodeFile oldnode = (INodeFile) old;
       fsDir.replaceNode(path, oldnode, cons);
-      fs.leaseManager.addLease(cons.clientName, path); 
+      fs.leaseManager.addLease(cons.getClientName(), path); 
     }
   }
 
@@ -1419,10 +1422,17 @@
     int numBlocks = in.readInt();
     BlockInfo[] blocks = new BlockInfo[numBlocks];
     Block blk = new Block();
-    for (int i = 0; i < numBlocks; i++) {
+    int i = 0;
+    for (; i < numBlocks-1; i++) {
       blk.readFields(in);
       blocks[i] = new BlockInfo(blk, blockReplication);
     }
+    // last block is UNDER_CONSTRUCTION
+    if(numBlocks > 0) {
+      blk.readFields(in);
+      blocks[i] = new BlockInfoUnderConstruction(
+        blk, blockReplication, BlockUCState.UNDER_CONSTRUCTION, null);
+    }
     PermissionStatus perm = PermissionStatus.read(in);
     String clientName = readString(in);
     String clientMachine = readString(in);
@@ -1430,7 +1440,7 @@
     // These locations are not used at all
     int numLocs = in.readInt();
     DatanodeDescriptor[] locations = new DatanodeDescriptor[numLocs];
-    for (int i = 0; i < numLocs; i++) {
+    for (i = 0; i < numLocs; i++) {
       locations[i] = new DatanodeDescriptor();
       locations[i].readFields(in);
     }
@@ -1893,7 +1903,7 @@
    */
   static Collection<URI> getCheckpointDirs(Configuration conf,
       String defaultValue) {
-    Collection<String> dirNames = conf.getStringCollection("fs.checkpoint.dir");
+    Collection<String> dirNames = conf.getStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY);
     if (dirNames.size() == 0 && defaultValue != null) {
       dirNames.add(defaultValue);
     }
@@ -1919,7 +1929,7 @@
   static Collection<URI> getCheckpointEditsDirs(Configuration conf,
       String defaultName) {
     Collection<String> dirNames = 
-      conf.getStringCollection("fs.checkpoint.edits.dir");
+      conf.getStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY);
     if (dirNames.size() == 0 && defaultName != null) {
       dirNames.add(defaultName);
     }



Mime
View raw message