hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r885143 [13/18] - in /hadoop/hdfs/branches/HDFS-326: ./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/ src/ant/org/apache/hadoop/ant/ src/ant/org/apache/hadoop/ant/condition/ src/c++/ src/c++/libhdfs/ src/c++/libhdfs/docs...
Date Sat, 28 Nov 2009 20:06:08 GMT
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Sat Nov 28 20:05:56 2009
@@ -22,15 +22,16 @@
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.security.AccessTokenHandler;
+import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMetrics;
 import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.AccessTokenHandler;
-import org.apache.hadoop.security.ExportedAccessKeys;
 import org.apache.hadoop.security.PermissionChecker;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -39,6 +40,7 @@
 import org.apache.hadoop.net.CachedDNSToSwitchMapping;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
@@ -52,10 +54,16 @@
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.io.IOUtils;
@@ -89,7 +97,7 @@
  * 4)  machine --> blocklist (inverted #2)
  * 5)  LRU cache of updated-heartbeat machines
  ***************************************************/
-public class FSNamesystem implements FSConstants, FSNamesystemMBean {
+public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterStats {
   public static final Log LOG = LogFactory.getLog(FSNamesystem.class);
   public static final String AUDIT_FORMAT =
     "ugi=%s\t" +  // ugi
@@ -123,6 +131,7 @@
   public static final Log auditLog = LogFactory.getLog(
       FSNamesystem.class.getName() + ".audit");
 
+  static int BLOCK_DELETION_INCREMENT = 1000;
   private boolean isPermissionEnabled;
   private UserGroupInformation fsOwner;
   private String supergroup;
@@ -199,8 +208,7 @@
   private long heartbeatExpireInterval;
   //replicationRecheckInterval is how often namenode checks for new replication work
   private long replicationRecheckInterval;
-  // default block size of a file
-  private long defaultBlockSize = 0;
+  private FsServerDefaults serverDefaults;
   // allow appending to hdfs files
   private boolean supportAppends = true;
 
@@ -288,7 +296,8 @@
     dnthread.start();
 
     this.dnsToSwitchMapping = ReflectionUtils.newInstance(
-        conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
+        conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, 
+                      ScriptBasedMapping.class,
             DNSToSwitchMapping.class), conf);
     
     /* If the dns to swith mapping supports cache, resolve network 
@@ -302,7 +311,7 @@
   }
 
   public static Collection<URI> getNamespaceDirs(Configuration conf) {
-    return getStorageDirs(conf, "dfs.name.dir");
+    return getStorageDirs(conf, DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
   }
 
   public static Collection<URI> getStorageDirs(Configuration conf,
@@ -314,7 +323,7 @@
       // but will retain directories specified in hdfs-site.xml
       // When importing image from a checkpoint, the name-node can
       // start with empty set of storage directories.
-      Configuration cE = new Configuration(false);
+      Configuration cE = new HdfsConfiguration(false);
       cE.addResource("core-default.xml");
       cE.addResource("core-site.xml");
       cE.addResource("hdfs-default.xml");
@@ -353,7 +362,7 @@
   }
 
   public static Collection<URI> getNamespaceEditsDirs(Configuration conf) {
-    return getStorageDirs(conf, "dfs.name.edits.dir");
+    return getStorageDirs(conf, DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY);
   }
 
   /**
@@ -397,34 +406,48 @@
     }
     LOG.info("fsOwner=" + fsOwner);
 
-    this.supergroup = conf.get("dfs.permissions.supergroup", "supergroup");
-    this.isPermissionEnabled = conf.getBoolean("dfs.permissions", true);
+    this.supergroup = conf.get(DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY, 
+                               DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
+    this.isPermissionEnabled = conf.getBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,
+                                               DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT);
     LOG.info("supergroup=" + supergroup);
     LOG.info("isPermissionEnabled=" + isPermissionEnabled);
-    short filePermission = (short)conf.getInt("dfs.upgrade.permission", 00777);
+    short filePermission = (short)conf.getInt(DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_KEY,
+                                              DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_DEFAULT);
     this.defaultPermission = PermissionStatus.createImmutable(
         fsOwner.getUserName(), supergroup, new FsPermission(filePermission));
 
     long heartbeatInterval = conf.getLong("dfs.heartbeat.interval", 3) * 1000;
     this.heartbeatRecheckInterval = conf.getInt(
-        "heartbeat.recheck.interval", 5 * 60 * 1000); // 5 minutes
+        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 
+        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
     this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval +
       10 * heartbeatInterval;
     this.replicationRecheckInterval = 
-      conf.getInt("dfs.replication.interval", 3) * 1000L;
-    this.defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
-    this.maxFsObjects = conf.getLong("dfs.max.objects", 0);
+      conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 
+                  DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L;
+    this.serverDefaults = new FsServerDefaults(
+        conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE),
+        conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BYTES_PER_CHECKSUM),
+        conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DEFAULT_WRITE_PACKET_SIZE),
+        (short) conf.getInt("dfs.replication", DEFAULT_REPLICATION_FACTOR),
+        conf.getInt("io.file.buffer.size", DEFAULT_FILE_BUFFER_SIZE));
+    this.maxFsObjects = conf.getLong(DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY, 
+                                     DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT);
     this.blockInvalidateLimit = Math.max(this.blockInvalidateLimit, 
                                          20*(int)(heartbeatInterval/1000));
-    this.accessTimePrecision = conf.getLong("dfs.access.time.precision", 0);
+    this.accessTimePrecision = conf.getLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, 0);
     this.supportAppends = conf.getBoolean("dfs.support.append", false);
     this.isAccessTokenEnabled = conf.getBoolean(
-        AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, false);
+        DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, 
+        DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT);
     if (isAccessTokenEnabled) {
       this.accessKeyUpdateInterval = conf.getLong(
-          AccessTokenHandler.STRING_ACCESS_KEY_UPDATE_INTERVAL, 600) * 60 * 1000L; // 10 hrs
+          DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_KEY, 
+          DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_DEFAULT) * 60 * 1000L; // 10 hrs
       this.accessTokenLifetime = conf.getLong(
-          AccessTokenHandler.STRING_ACCESS_TOKEN_LIFETIME, 600) * 60 * 1000L; // 10 hrs
+          DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY, 
+          DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_DEFAULT) * 60 * 1000L; // 10 hrs
     }
     LOG.info("isAccessTokenEnabled=" + isAccessTokenEnabled
         + " accessKeyUpdateInterval=" + accessKeyUpdateInterval / (60 * 1000)
@@ -530,11 +553,22 @@
    */
   synchronized void metaSave(String filename) throws IOException {
     checkSuperuserPrivilege();
-    File file = new File(System.getProperty("hadoop.log.dir"), 
-                         filename);
-    PrintWriter out = new PrintWriter(new BufferedWriter(
-                                                         new FileWriter(file, true)));
-
+    File file = new File(System.getProperty("hadoop.log.dir"), filename);
+    PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(file,
+        true)));
+
+    long totalInodes = this.dir.totalInodes();
+    long totalBlocks = this.getBlocksTotal();
+
+    ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
+    ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
+    this.DFSNodesStatus(live, dead);
+    
+    String str = totalInodes + " files and directories, " + totalBlocks
+        + " blocks = " + (totalInodes + totalBlocks) + " total";
+    out.println(str);
+    out.println("Live Datanodes: "+live.size());
+    out.println("Dead Datanodes: "+dead.size());
     blockManager.metaSave(out);
 
     //
@@ -547,7 +581,11 @@
   }
 
   long getDefaultBlockSize() {
-    return defaultBlockSize;
+    return serverDefaults.getBlockSize();
+  }
+
+  FsServerDefaults getServerDefaults() {
+    return serverDefaults;
   }
 
   long getAccessTimePrecision() {
@@ -595,13 +633,18 @@
     }
     List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
     long totalSize = 0;
+    BlockInfo curBlock;
     while(totalSize<size && iter.hasNext()) {
-      totalSize += addBlock(iter.next(), results);
+      curBlock = iter.next();
+      if(!curBlock.isComplete())  continue;
+      totalSize += addBlock(curBlock, results);
     }
     if(totalSize<size) {
       iter = node.getBlockIterator(); // start from the beginning
       for(int i=0; i<startBlock&&totalSize<size; i++) {
-        totalSize += addBlock(iter.next(), results);
+        curBlock = iter.next();
+        if(!curBlock.isComplete())  continue;
+        totalSize += addBlock(curBlock, results);
       }
     }
 
@@ -684,8 +727,7 @@
 
   /**
    * Get block locations within the specified range.
-   * 
-   * @see #getBlockLocations(String, long, long)
+   * @see ClientProtocol#getBlockLocations(String, long, long)
    */
   LocatedBlocks getBlockLocations(String clientMachine, String src,
       long offset, long length) throws IOException {
@@ -708,18 +750,9 @@
   /**
    * Get block locations within the specified range.
    * @see ClientProtocol#getBlockLocations(String, long, long)
-   */
-  public LocatedBlocks getBlockLocations(String src, long offset, long length
-      ) throws IOException {
-    return getBlockLocations(src, offset, length, false);
-  }
-
-  /**
-   * Get block locations within the specified range.
-   * @see ClientProtocol#getBlockLocations(String, long, long)
    * @throws FileNotFoundException
    */
-  public LocatedBlocks getBlockLocations(String src, long offset, long length,
+  LocatedBlocks getBlockLocations(String src, long offset, long length,
       boolean doAccessTime) throws IOException {
     if (offset < 0) {
       throw new IOException("Negative offset is not supported. File: " + src );
@@ -727,11 +760,8 @@
     if (length < 0) {
       throw new IOException("Negative length is not supported. File: " + src );
     }
-    INodeFile inode = dir.getFileINode(src);
-    if (inode == null)
-      throw new FileNotFoundException();
-    final LocatedBlocks ret = getBlockLocationsInternal(src, inode,
-        offset, length, Integer.MAX_VALUE, doAccessTime);  
+    final LocatedBlocks ret = getBlockLocationsInternal(src,
+        offset, length, doAccessTime);  
     if (auditLog.isInfoEnabled()) {
       logAuditEvent(UserGroupInformation.getCurrentUGI(),
                     Server.getRemoteIp(),
@@ -741,29 +771,196 @@
   }
 
   private synchronized LocatedBlocks getBlockLocationsInternal(String src,
-                                                       INodeFile inode,
                                                        long offset, 
                                                        long length,
-                                                       int nrBlocksToReturn,
                                                        boolean doAccessTime
                                                        ) throws IOException {
-    if(inode == null) {
-      return null;
-    }
+    INodeFile inode = dir.getFileINode(src);
+    if (inode == null)
+      throw new FileNotFoundException();
     if (doAccessTime && isAccessTimeSupported()) {
       dir.setTimes(src, inode, -1, now(), false);
     }
-    Block[] blocks = inode.getBlocks();
+    final BlockInfo[] blocks = inode.getBlocks();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
+    }
     if (blocks == null) {
       return null;
     }
+
     if (blocks.length == 0) {
-      return inode.createLocatedBlocks(new ArrayList<LocatedBlock>(blocks.length));
+      return new LocatedBlocks(0, inode.isUnderConstruction(),
+          Collections.<LocatedBlock>emptyList(), null, false);
+    } else {
+      final long n = inode.computeFileSize(false);
+      final List<LocatedBlock> locatedblocks = blockManager.getBlockLocations(
+          blocks, offset, length, Integer.MAX_VALUE);
+      final BlockInfo last = inode.getLastBlock();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("last = " + last);
+      }
+
+      if (!last.isComplete()) {
+        return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
+            blockManager.getBlockLocation(last, n), false);
+      }
+      else {
+        return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
+            blockManager.getBlockLocation(last, n-last.getNumBytes()), true);
+      }
+    }
+  }
+
+  /** Create a LocatedBlock. */
+  LocatedBlock createLocatedBlock(final Block b, final DatanodeInfo[] locations,
+      final long offset, final boolean corrupt) throws IOException {
+    final LocatedBlock lb = new LocatedBlock(b, locations, offset, corrupt);
+    if (isAccessTokenEnabled) {
+      lb.setAccessToken(accessTokenHandler.generateToken(b.getBlockId(),
+          EnumSet.of(AccessTokenHandler.AccessMode.READ)));
+    }
+    return lb;
+  }
+  
+  /**
+   * Moves all the blocks from srcs and appends them to trg
+   * To avoid rollbacks we will verify validitity of ALL of the args
+   * before we start actual move.
+   * @param target
+   * @param srcs
+   * @throws IOException
+   */
+  public void concat(String target, String [] srcs) throws IOException{
+    FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) + " to " + target);
+    // check safe mode
+    if (isInSafeMode()) {
+      throw new SafeModeException("concat: cannot concat " + target, safeMode);
+    }
+    
+    // verify args
+    if(target.isEmpty()) {
+      throw new IllegalArgumentException("concat: trg file name is empty");
+    }
+    if(srcs == null || srcs.length == 0) {
+      throw new IllegalArgumentException("concat:  srcs list is empty or null");
+    }
+    
+    // curretnly we require all the files to be in the same dir
+    String trgParent = 
+      target.substring(0, target.lastIndexOf(Path.SEPARATOR_CHAR));
+    for(String s : srcs) {
+      String srcParent = s.substring(0, s.lastIndexOf(Path.SEPARATOR_CHAR));
+      if(! srcParent.equals(trgParent)) {
+        throw new IllegalArgumentException
+           ("concat:  srcs and target shoould be in same dir");
+      }
+    }
+    
+    synchronized(this) {
+      // write permission for the target
+      if (isPermissionEnabled) {
+        checkPathAccess(target, FsAction.WRITE);
+
+        // and srcs
+        for(String aSrc: srcs) {
+          checkPathAccess(aSrc, FsAction.READ); // read the file
+          checkParentAccess(aSrc, FsAction.WRITE); // for delete 
+        }
+      }
+
+
+      // to make sure no two files are the same
+      Set<INode> si = new HashSet<INode>();
+
+      // we put the following prerequisite for the operation
+      // replication and blocks sizes should be the same for ALL the blocks
+      // check the target
+      INode inode = dir.getFileINode(target);
+
+      if(inode == null) {
+        throw new IllegalArgumentException("concat: trg file doesn't exist");
+      }
+      if(inode.isUnderConstruction()) {
+        throw new IllegalArgumentException("concat: trg file is uner construction");
+      }
+
+      INodeFile trgInode = (INodeFile) inode;
+
+      // per design trg shouldn't be empty and all the blocks same size
+      if(trgInode.blocks.length == 0) {
+        throw new IllegalArgumentException("concat: "+ target + " file is empty");
+      }
+
+      long blockSize = trgInode.preferredBlockSize;
+
+      // check the end block to be full
+      if(blockSize != trgInode.blocks[trgInode.blocks.length-1].getNumBytes()) {
+        throw new IllegalArgumentException(target + " blocks size should be the same");
+      }
+
+      si.add(trgInode);
+      short repl = trgInode.blockReplication;
+
+      // now check the srcs
+      boolean endSrc = false; // final src file doesn't have to have full end block
+      for(int i=0; i<srcs.length; i++) {
+        String src = srcs[i];
+        if(i==srcs.length-1)
+          endSrc=true;
+
+        INodeFile srcInode = dir.getFileINode(src);
+
+        if(src.isEmpty() 
+            || srcInode == null
+            || srcInode.isUnderConstruction()
+            || srcInode.blocks.length == 0) {
+          throw new IllegalArgumentException("concat: file " + src + 
+          " is invalid or empty or underConstruction");
+        }
+
+        // check replication and blocks size
+        if(repl != srcInode.blockReplication) {
+          throw new IllegalArgumentException(src + " and " + target + " " +
+              "should have same replication: "
+              + repl + " vs. " + srcInode.blockReplication);
+        }
+
+        //boolean endBlock=false;
+        // verify that all the blocks are of the same length as target
+        // should be enough to check the end blocks
+        int idx = srcInode.blocks.length-1;
+        if(endSrc)
+          idx = srcInode.blocks.length-2; // end block of endSrc is OK not to be full
+        if(idx >= 0 && srcInode.blocks[idx].getNumBytes() != blockSize) {
+          throw new IllegalArgumentException("concat: blocks sizes of " + 
+              src + " and " + target + " should all be the same");
+        }
+
+        si.add(srcInode);
+      }
+
+      // make sure no two files are the same
+      if(si.size() < srcs.length+1) { // trg + srcs
+        // it means at least two files are the same
+        throw new IllegalArgumentException("at least two files are the same");
+      }
+
+      NameNode.stateChangeLog.debug("DIR* NameSystem.concat: " + 
+          Arrays.toString(srcs) + " to " + target);
+
+      dir.concatInternal(target,srcs);
     }
+    getEditLog().logSync();
+   
     
-    List<LocatedBlock> results = blockManager.getBlockLocations(blocks,
-        offset, length, nrBlocksToReturn);
-    return inode.createLocatedBlocks(results);
+    if (auditLog.isInfoEnabled()) {
+      final FileStatus stat = dir.getFileInfo(target);
+      logAuditEvent(UserGroupInformation.getCurrentUGI(),
+                    Server.getRemoteIp(),
+                    "concat", Arrays.toString(srcs), target, stat);
+    }
+   
   }
 
   /**
@@ -863,6 +1060,24 @@
     return dir.getPreferredBlockSize(filename);
   }
 
+  /*
+   * Verify that parent dir exists
+   */
+  private void verifyParentDir(String src) throws FileAlreadyExistsException,
+      FileNotFoundException {
+    Path parent = new Path(src).getParent();
+    if (parent != null) {
+      INode[] pathINodes = dir.getExistingPathINodes(parent.toString());
+      if (pathINodes[pathINodes.length - 1] == null) {
+        throw new FileNotFoundException("Parent directory doesn't exist: "
+            + parent.toString());
+      } else if (!pathINodes[pathINodes.length - 1].isDirectory()) {
+        throw new FileAlreadyExistsException("Parent path is not a directory: "
+            + parent.toString());
+      }
+    }
+  }
+
   /**
    * Create a new file entry in the namespace.
    * 
@@ -873,10 +1088,11 @@
    */
   void startFile(String src, PermissionStatus permissions,
                  String holder, String clientMachine,
-                 EnumSet<CreateFlag> flag, short replication, long blockSize
+                 EnumSet<CreateFlag> flag, boolean createParent, 
+                 short replication, long blockSize
                 ) throws IOException {
     startFileInternal(src, permissions, holder, clientMachine, flag,
-        replication, blockSize);
+        createParent, replication, blockSize);
     getEditLog().logSync();
     if (auditLog.isInfoEnabled()) {
       final FileStatus stat = dir.getFileInfo(src);
@@ -891,6 +1107,7 @@
                                               String holder, 
                                               String clientMachine, 
                                               EnumSet<CreateFlag> flag,
+                                              boolean createParent,
                                               short replication,
                                               long blockSize
                                               ) throws IOException {
@@ -902,6 +1119,7 @@
       NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: src=" + src
           + ", holder=" + holder
           + ", clientMachine=" + clientMachine
+          + ", createParent=" + createParent
           + ", replication=" + replication
           + ", overwrite=" + overwrite
           + ", append=" + append);
@@ -928,6 +1146,10 @@
       }
     }
 
+    if (!createParent) {
+      verifyParentDir(src);
+    }
+
     try {
       INode myFile = dir.getFileINode(src);
       if (myFile != null && myFile.isUnderConstruction()) {
@@ -936,40 +1158,45 @@
         // If the file is under construction , then it must be in our
         // leases. Find the appropriate lease record.
         //
-        Lease lease = leaseManager.getLease(holder);
-        //
-        // We found the lease for this file. And surprisingly the original
-        // holder is trying to recreate this file. This should never occur.
-        //
-        if (lease != null) {
+        Lease lease = leaseManager.getLeaseByPath(src);
+        if (lease == null) {
           throw new AlreadyBeingCreatedException(
-                                                 "failed to create file " + src + " for " + holder +
-                                                 " on client " + clientMachine + 
-                                                 " because current leaseholder is trying to recreate file.");
+              "failed to create file " + src + " for " + holder +
+              " on client " + clientMachine + 
+              " because pendingCreates is non-null but no leases found.");
         }
         //
-        // Find the original holder.
+        // We found the lease for this file. And surprisingly the original
+        // holder is trying to recreate this file. This should never occur.
         //
-        lease = leaseManager.getLease(pendingFile.clientName);
-        if (lease == null) {
+        if (lease.getHolder().equals(holder)) {
           throw new AlreadyBeingCreatedException(
-                                                 "failed to create file " + src + " for " + holder +
-                                                 " on client " + clientMachine + 
-                                                 " because pendingCreates is non-null but no leases found.");
+              "failed to create file " + src + " for " + holder +
+              " on client " + clientMachine + 
+              " because current leaseholder is trying to recreate file.");
         }
+        assert lease.getHolder().equals(pendingFile.getClientName()) :
+          "Current lease holder " + lease.getHolder() +
+          " does not match file creator " + pendingFile.getClientName();
         //
+        // Current lease holder is different from the requester.
         // If the original holder has not renewed in the last SOFTLIMIT 
-        // period, then start lease recovery.
+        // period, then start lease recovery, otherwise fail.
         //
         if (lease.expiredSoftLimit()) {
           LOG.info("startFile: recover lease " + lease + ", src=" + src);
-          internalReleaseLease(lease, src);
-        }
-        throw new AlreadyBeingCreatedException("failed to create file " + src + " for " + holder +
-                                               " on client " + clientMachine + 
-                                               ", because this file is already being created by " +
-                                               pendingFile.getClientName() + 
-                                               " on " + pendingFile.getClientMachine());
+          boolean isClosed = internalReleaseLease(lease, src, null);
+          if(!isClosed)
+            throw new RecoveryInProgressException(
+                "Failed to close file " + src +
+                ". Lease recovery is in progress. Try again later.");
+
+        } else
+          throw new AlreadyBeingCreatedException("failed to create file " +
+              src + " for " + holder + " on client " + clientMachine + 
+              ", because this file is already being created by " +
+              pendingFile.getClientName() + 
+              " on " + pendingFile.getClientMachine());
       }
 
       try {
@@ -985,7 +1212,7 @@
           else {
             //append & create a nonexist file equals to overwrite
             this.startFileInternal(src, permissions, holder, clientMachine,
-                EnumSet.of(CreateFlag.OVERWRITE), replication, blockSize);
+                EnumSet.of(CreateFlag.OVERWRITE), createParent, replication, blockSize);
             return;
           }
         } else if (myFile.isDirectory()) {
@@ -1022,7 +1249,7 @@
                                         clientMachine,
                                         clientNode);
         dir.replaceNode(src, node, cons);
-        leaseManager.addLease(cons.clientName, src);
+        leaseManager.addLease(cons.getClientName(), src);
 
       } else {
        // Now we can add the name to the filesystem. This file has no
@@ -1038,7 +1265,7 @@
           throw new IOException("DIR* NameSystem.startFile: " +
                                 "Unable to add file to namespace.");
         }
-        leaseManager.addLease(newNode.clientName, src);
+        leaseManager.addLease(newNode.getClientName(), src);
         if (NameNode.stateChangeLog.isDebugEnabled()) {
           NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
                                      +"add "+src+" to namespace for "+holder);
@@ -1061,7 +1288,7 @@
                             " Please refer to dfs.support.append configuration parameter.");
     }
     startFileInternal(src, null, holder, clientMachine, EnumSet.of(CreateFlag.APPEND), 
-                      (short)blockManager.maxReplication, (long)0);
+                      false, (short)blockManager.maxReplication, (long)0);
     getEditLog().logSync();
 
     //
@@ -1072,40 +1299,36 @@
     LocatedBlock lb = null;
     synchronized (this) {
       INodeFileUnderConstruction file = (INodeFileUnderConstruction)dir.getFileINode(src);
-
-      BlockInfo[] blocks = file.getBlocks();
-      if (blocks != null && blocks.length > 0) {
-        BlockInfo last = blocks[blocks.length-1];
-        // this is a redundant search in blocksMap
-        // should be resolved by the new implementation of append
-        BlockInfo storedBlock = blockManager.getStoredBlock(last);
-        assert last == storedBlock : "last block should be in the blocksMap";
-        if (file.getPreferredBlockSize() > storedBlock.getNumBytes()) {
+      BlockInfo lastBlock = file.getLastBlock();
+      if (lastBlock != null) {
+        assert lastBlock == blockManager.getStoredBlock(lastBlock) :
+          "last block of the file is not in blocksMap";
+        if (file.getPreferredBlockSize() > lastBlock.getNumBytes()) {
           long fileLength = file.computeContentSummary().getLength();
-          DatanodeDescriptor[] targets = blockManager.getNodes(storedBlock);
+          DatanodeDescriptor[] targets = blockManager.getNodes(lastBlock);
           // remove the replica locations of this block from the node
           for (int i = 0; i < targets.length; i++) {
-            targets[i].removeBlock(storedBlock);
+            targets[i].removeBlock(lastBlock);
           }
-          // set the locations of the last block in the lease record
-          file.setLastBlock(storedBlock, targets);
+          // convert last block to under-construction and set its locations
+          blockManager.convertLastBlockToUnderConstruction(file, targets);
 
-          lb = new LocatedBlock(last, targets, 
-                                fileLength-storedBlock.getNumBytes());
+          lb = new LocatedBlock(lastBlock, targets, 
+                                fileLength-lastBlock.getNumBytes());
           if (isAccessTokenEnabled) {
             lb.setAccessToken(accessTokenHandler.generateToken(lb.getBlock()
                 .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
           }
 
           // Remove block from replication queue.
-          blockManager.updateNeededReplications(last, 0, 0);
+          blockManager.updateNeededReplications(lastBlock, 0, 0);
 
           // remove this block from the list of pending blocks to be deleted. 
           // This reduces the possibility of triggering HADOOP-1349.
           //
           for (DatanodeDescriptor dd : targets) {
             String datanodeId = dd.getStorageID();
-            blockManager.removeFromInvalidates(datanodeId, last);
+            blockManager.removeFromInvalidates(datanodeId, lastBlock);
           }
         }
       }
@@ -1138,8 +1361,17 @@
    * are replicated.  Will return an empty 2-elt array if we want the
    * client to "try again later".
    */
-  public LocatedBlock getAdditionalBlock(String src, 
-                                         String clientName
+  public LocatedBlock getAdditionalBlock(String src,
+                                         String clientName,
+                                         Block previous
+                                         ) throws IOException {
+    return getAdditionalBlock(src, clientName, previous, null);
+  }
+
+  public LocatedBlock getAdditionalBlock(String src,
+                                         String clientName,
+                                         Block previous,
+                                         HashMap<Node, Node> excludedNodes
                                          ) throws IOException {
     long fileLength, blockSize;
     int replication;
@@ -1159,6 +1391,9 @@
 
       INodeFileUnderConstruction pendingFile  = checkLease(src, clientName);
 
+      // commit the last block
+      blockManager.commitLastBlock(pendingFile, previous);
+
       //
       // If we fail this, bad things happen!
       //
@@ -1173,7 +1408,7 @@
 
     // choose targets for the new block to be allocated.
     DatanodeDescriptor targets[] = blockManager.replicator.chooseTarget(
-        replication, clientNode, null, blockSize);
+        src, replication, clientNode, excludedNodes, blockSize);
     if (targets.length < blockManager.minReplication) {
       throw new IOException("File " + src + " could only be replicated to " +
                             targets.length + " nodes, instead of " +
@@ -1195,9 +1430,11 @@
         throw new NotReplicatedYetException("Not replicated yet:" + src);
       }
 
+      // complete the penultimate block
+      blockManager.completeBlock(pendingFile, pendingFile.numBlocks()-2);
+
       // allocate new block record block locations in INode.
-      newBlock = allocateBlock(src, pathINodes);
-      pendingFile.setTargets(targets);
+      newBlock = allocateBlock(src, pathINodes, targets);
       
       for (DatanodeDescriptor dn : targets) {
         dn.incBlocksScheduled();
@@ -1277,15 +1514,18 @@
     COMPLETE_SUCCESS
   }
   
-  public CompleteFileStatus completeFile(String src, String holder) throws IOException {
-    CompleteFileStatus status = completeFileInternal(src, holder);
+  public CompleteFileStatus completeFile(String src,
+                                         String holder,
+                                         Block last) throws IOException {
+    CompleteFileStatus status = completeFileInternal(src, holder, last);
     getEditLog().logSync();
     return status;
   }
 
-
-  private synchronized CompleteFileStatus completeFileInternal(String src, 
-                                                String holder) throws IOException {
+  private synchronized CompleteFileStatus completeFileInternal(
+                                            String src, 
+                                            String holder,
+                                            Block last) throws IOException {
     NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " + holder);
     if (isInSafeMode())
       throw new SafeModeException("Cannot complete file " + src, safeMode);
@@ -1306,7 +1546,12 @@
                                      ("from " + pendingFile.getClientMachine()))
                                   );                      
       return CompleteFileStatus.OPERATION_FAILED;
-    } else if (!checkFileProgress(pendingFile, true)) {
+    } 
+
+    // commit the last block
+    blockManager.commitLastBlock(pendingFile, last);
+
+    if (!checkFileProgress(pendingFile, true)) {
       return CompleteFileStatus.STILL_WAITING;
     }
 
@@ -1339,13 +1584,15 @@
    * @param inodes INode representing each of the components of src. 
    *        <code>inodes[inodes.length-1]</code> is the INode for the file.
    */
-  private Block allocateBlock(String src, INode[] inodes) throws IOException {
+  private Block allocateBlock(String src,
+                              INode[] inodes,
+                              DatanodeDescriptor targets[]) throws IOException {
     Block b = new Block(FSNamesystem.randBlockId.nextLong(), 0, 0); 
     while(isValidBlock(b)) {
       b.setBlockId(FSNamesystem.randBlockId.nextLong());
     }
     b.setGenerationStamp(getGenerationStamp());
-    b = dir.addBlock(src, inodes, b);
+    b = dir.addBlock(src, inodes, b, targets);
     NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: "
                                  +src+ ". "+b);
     return b;
@@ -1356,13 +1603,16 @@
    * replicated.  If not, return false. If checkall is true, then check
    * all blocks, otherwise check only penultimate block.
    */
-  synchronized boolean checkFileProgress(INodeFile v, boolean checkall) {
+  synchronized boolean checkFileProgress(INodeFile v, boolean checkall) throws IOException {
     if (checkall) {
       //
       // check all blocks of the file.
       //
-      for (Block block: v.getBlocks()) {
+      for (BlockInfo block: v.getBlocks()) {
         if (!blockManager.checkMinReplication(block)) {
+          LOG.info("BLOCK* NameSystem.checkFileProgress: "
+              + "block " + block + " has not reached minimal replication "
+              + blockManager.minReplication);
           return false;
         }
       }
@@ -1370,8 +1620,11 @@
       //
       // check the penultimate block of this file
       //
-      Block b = v.getPenultimateBlock();
+      BlockInfo b = v.getPenultimateBlock();
       if (b != null && !blockManager.checkMinReplication(b)) {
+        LOG.info("BLOCK* NameSystem.checkFileProgress: "
+            + "block " + b + " has not reached minimal replication "
+            + blockManager.minReplication);
         return false;
       }
     }
@@ -1401,8 +1654,12 @@
   // are made, edit namespace and return to client.
   ////////////////////////////////////////////////////////////////
 
-  /** Change the indicated filename. */
-  public boolean renameTo(String src, String dst) throws IOException {
+  /** 
+   * Change the indicated filename. 
+   * @deprecated Use {@link #renameTo(String, String, Options.Rename...)} instead.
+   */
+  @Deprecated
+  boolean renameTo(String src, String dst) throws IOException {
     boolean status = renameToInternal(src, dst);
     getEditLog().logSync();
     if (status && auditLog.isInfoEnabled()) {
@@ -1414,6 +1671,8 @@
     return status;
   }
 
+  /** @deprecated See {@link #renameTo(String, String)} */
+  @Deprecated
   private synchronized boolean renameToInternal(String src, String dst
       ) throws IOException {
     NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src + " to " + dst);
@@ -1439,7 +1698,46 @@
     }
     return false;
   }
+  
+
+  /** Rename src to dst */
+  void renameTo(String src, String dst, Options.Rename... options)
+      throws IOException {
+    renameToInternal(src, dst, options);
+    getEditLog().logSync();
+    if (auditLog.isInfoEnabled()) {
+      StringBuilder cmd = new StringBuilder("rename options=");
+      for (Rename option : options) {
+        cmd.append(option.value()).append(" ");
+      }
+      final FileStatus stat = dir.getFileInfo(dst);
+      logAuditEvent(UserGroupInformation.getCurrentUGI(), Server.getRemoteIp(),
+                    cmd.toString(), src, dst, stat);
+    }
+  }
+
+  private synchronized void renameToInternal(String src, String dst,
+      Options.Rename... options) throws IOException {
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
+          + src + " to " + dst);
+    }
+    if (isInSafeMode()) {
+      throw new SafeModeException("Cannot rename " + src, safeMode);
+    }
+    if (!DFSUtil.isValidName(dst)) {
+      throw new IOException("Invalid name: " + dst);
+    }
+    if (isPermissionEnabled) {
+      checkParentAccess(src, FsAction.WRITE);
+      checkAncestorAccess(dst, FsAction.WRITE);
+    }
 
+    FileStatus dinfo = dir.getFileInfo(dst);
+    dir.renameTo(src, dst, options);
+    changeLease(src, dst, dinfo); // update lease with new filename
+  }
+  
   /**
    * Remove the indicated filename from namespace. If the filename 
    * is a directory (non empty) and recursive is set to false then throw exception.
@@ -1448,8 +1746,10 @@
       if ((!recursive) && (!dir.isDirEmpty(src))) {
         throw new IOException(src + " is non empty");
       }
+      if (NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
+      }
       boolean status = deleteInternal(src, true);
-      getEditLog().logSync();
       if (status && auditLog.isInfoEnabled()) {
         logAuditEvent(UserGroupInformation.getCurrentUGI(),
                       Server.getRemoteIp(),
@@ -1459,25 +1759,68 @@
     }
     
   /**
-   * Remove the indicated filename from the namespace.  This may
-   * invalidate some blocks that make up the file.
+   * Remove a file/directory from the namespace.
+   * <p>
+   * For large directories, deletion is incremental. The blocks under
+   * the directory are collected and deleted a small number at a time holding
+   * the {@link FSNamesystem} lock.
+   * <p>
+   * For small directory or file the deletion is done in one shot.
    */
-  synchronized boolean deleteInternal(String src, 
+  private boolean deleteInternal(String src, 
       boolean enforcePermission) throws IOException {
-    if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
+    boolean deleteNow = false;
+    ArrayList<Block> collectedBlocks = new ArrayList<Block>();
+    synchronized(this) {
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot delete " + src, safeMode);
+      }
+      if (enforcePermission && isPermissionEnabled) {
+        checkPermission(src, false, null, FsAction.WRITE, null, FsAction.ALL);
+      }
+      // Unlink the target directory from directory tree
+      if (!dir.delete(src, collectedBlocks)) {
+        return false;
+      }
+      deleteNow = collectedBlocks.size() <= BLOCK_DELETION_INCREMENT;
+      if (deleteNow) { // Perform small deletes right away
+        removeBlocks(collectedBlocks);
+      }
     }
-    if (isInSafeMode())
-      throw new SafeModeException("Cannot delete " + src, safeMode);
-    if (enforcePermission && isPermissionEnabled) {
-      checkPermission(src, false, null, FsAction.WRITE, null, FsAction.ALL);
+    // Log directory deletion to editlog
+    getEditLog().logSync();
+    if (!deleteNow) {
+      removeBlocks(collectedBlocks); // Incremental deletion of blocks
     }
-
-    return dir.delete(src) != null;
+    collectedBlocks.clear();
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* Namesystem.delete: "
+        + src +" is removed");
+    }
+    return true;
   }
 
+  /** From the given list, incrementally remove the blocks from blockManager */
+  private void removeBlocks(List<Block> blocks) {
+    int start = 0;
+    int end = 0;
+    while (start < blocks.size()) {
+      end = BLOCK_DELETION_INCREMENT + start;
+      end = end > blocks.size() ? blocks.size() : end;
+      synchronized(this) {
+        for (int i=start; i<end; i++) {
+          blockManager.removeBlock(blocks.get(i));
+        }
+      }
+      start = end;
+    }
+  }
+  
   void removePathAndBlocks(String src, List<Block> blocks) {
     leaseManager.removeLeaseWithPrefixPath(src);
+    if (blocks == null) {
+      return;
+    }
     for(Block b : blocks) {
       blockManager.removeBlock(b);
     }
@@ -1502,9 +1845,9 @@
   /**
    * Create all the necessary directories
    */
-  public boolean mkdirs(String src, PermissionStatus permissions
-      ) throws IOException {
-    boolean status = mkdirsInternal(src, permissions);
+  public boolean mkdirs(String src, PermissionStatus permissions,
+      boolean createParent) throws IOException {
+    boolean status = mkdirsInternal(src, permissions, createParent);
     getEditLog().logSync();
     if (status && auditLog.isInfoEnabled()) {
       final FileStatus stat = dir.getFileInfo(src);
@@ -1519,7 +1862,7 @@
    * Create all the necessary directories
    */
   private synchronized boolean mkdirsInternal(String src,
-      PermissionStatus permissions) throws IOException {
+      PermissionStatus permissions, boolean createParent) throws IOException {
     NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
     if (isPermissionEnabled) {
       checkTraverse(src);
@@ -1538,6 +1881,10 @@
       checkAncestorAccess(src, FsAction.WRITE);
     }
 
+    if (!createParent) {
+      verifyParentDir(src);
+    }
+
     // validate that we have enough inodes. This is, at best, a 
     // heuristic because the mkdirs() operation migth need to 
     // create multiple inodes.
@@ -1592,20 +1939,31 @@
    * Move a file that is being written to be immutable.
    * @param src The filename
    * @param lease The lease for the client creating the file
-   */
-  void internalReleaseLease(Lease lease, String src) throws IOException {
+   * @param recoveryLeaseHolder reassign lease to this holder if the last block
+   *        needs recovery; keep current holder if null.
+   * @throws AlreadyBeingCreatedException if file is waiting to achieve minimal
+   *         replication;<br>
+   *         RecoveryInProgressException if lease recovery is in progress.<br>
+   *         IOException in case of an error.
+   * @return true  if file has been successfully finalized and closed or 
+   *         false if block recovery has been initiated
+   */
+  boolean internalReleaseLease(
+      Lease lease, String src, String recoveryLeaseHolder)
+  throws AlreadyBeingCreatedException,
+         IOException {
     LOG.info("Recovering lease=" + lease + ", src=" + src);
 
     INodeFile iFile = dir.getFileINode(src);
     if (iFile == null) {
-      final String message = "DIR* NameSystem.internalReleaseCreate: "
+      final String message = "DIR* NameSystem.internalReleaseLease: "
         + "attempt to release a create lock on "
         + src + " file does not exist.";
       NameNode.stateChangeLog.warn(message);
       throw new IOException(message);
     }
     if (!iFile.isUnderConstruction()) {
-      final String message = "DIR* NameSystem.internalReleaseCreate: "
+      final String message = "DIR* NameSystem.internalReleaseLease: "
         + "attempt to release a create lock on "
         + src + " but file is already closed.";
       NameNode.stateChangeLog.warn(message);
@@ -1613,39 +1971,123 @@
     }
 
     INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) iFile;
+    int nrBlocks = pendingFile.numBlocks();
+    BlockInfo[] blocks = pendingFile.getBlocks();
+
+    int nrCompleteBlocks;
+    BlockInfo curBlock = null;
+    for(nrCompleteBlocks = 0; nrCompleteBlocks < nrBlocks; nrCompleteBlocks++) {
+      curBlock = blocks[nrCompleteBlocks];
+      if(!curBlock.isComplete())
+        break;
+      assert blockManager.checkMinReplication(curBlock) :
+              "A COMPLETE block is not minimally replicated in " + src;
+    }
+
+    // If there are no incomplete blocks associated with this file,
+    // then reap lease immediately and close the file.
+    if(nrCompleteBlocks == nrBlocks) {
+      finalizeINodeFileUnderConstruction(src, pendingFile);
+      NameNode.stateChangeLog.warn("BLOCK*"
+        + " internalReleaseLease: All existing blocks are COMPLETE,"
+        + " lease removed, file closed.");
+      return true;  // closed!
+    }
+
+    // Only the last and the penultimate blocks may be in non COMPLETE state.
+    // If the penultimate block is not COMPLETE, then it must be COMMITTED.
+    if(nrCompleteBlocks < nrBlocks - 2 ||
+       nrCompleteBlocks == nrBlocks - 2 &&
+         curBlock.getBlockUCState() != BlockUCState.COMMITTED) {
+      final String message = "DIR* NameSystem.internalReleaseLease: "
+        + "attempt to release a create lock on "
+        + src + " but file is already closed.";
+      NameNode.stateChangeLog.warn(message);
+      throw new IOException(message);
+    }
 
-    // Initialize lease recovery for pendingFile. If there are no blocks 
-    // associated with this file, then reap lease immediately. Otherwise 
-    // renew the lease and trigger lease recovery.
-    if (pendingFile.getTargets() == null ||
-        pendingFile.getTargets().length == 0) {
-      if (pendingFile.getBlocks().length == 0) {
+    // no we know that the last block is not COMPLETE, and
+    // that the penultimate block if exists is either COMPLETE or COMMITTED
+    BlockInfoUnderConstruction lastBlock = pendingFile.getLastBlock();
+    BlockUCState lastBlockState = lastBlock.getBlockUCState();
+    BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();
+    BlockUCState penultimateBlockState = (penultimateBlock == null ?
+        BlockUCState.COMPLETE : penultimateBlock.getBlockUCState());
+    assert penultimateBlockState == BlockUCState.COMPLETE ||
+           penultimateBlockState == BlockUCState.COMMITTED :
+           "Unexpected state of penultimate block in " + src;
+
+    switch(lastBlockState) {
+    case COMPLETE:
+      assert false : "Already checked that the last block is incomplete";
+      break;
+    case COMMITTED:
+      // Close file if committed blocks are minimally replicated
+      if(blockManager.checkMinReplication(penultimateBlock) &&
+          blockManager.checkMinReplication(lastBlock)) {
         finalizeINodeFileUnderConstruction(src, pendingFile);
         NameNode.stateChangeLog.warn("BLOCK*"
-          + " internalReleaseLease: No blocks found, lease removed.");
-        return;
-      }
-      // setup the Inode.targets for the last block from the blockManager
-      //
-      BlockInfo[] blocks = pendingFile.getBlocks();
-      BlockInfo last = blocks[blocks.length-1];
-      DatanodeDescriptor[] targets = blockManager.getNodes(last);
-      pendingFile.setTargets(targets);
+          + " internalReleaseLease: Committed blocks are minimally replicated,"
+          + " lease removed, file closed.");
+        return true;  // closed!
+      }
+      // Cannot close file right now, since some blocks 
+      // are not yet minimally replicated.
+      // This may potentially cause infinite loop in lease recovery
+      // if there are no valid replicas on data-nodes.
+      String message = "DIR* NameSystem.internalReleaseLease: " +
+          "Failed to release lease for file " + src +
+          ". Committed blocks are waiting to be minimally replicated." +
+          " Try again later.";
+      NameNode.stateChangeLog.warn(message);
+      throw new AlreadyBeingCreatedException(message);
+    case UNDER_CONSTRUCTION:
+    case UNDER_RECOVERY:
+      // setup the last block locations from the blockManager if not known
+      if(lastBlock.getNumExpectedLocations() == 0)
+        lastBlock.setExpectedLocations(blockManager.getNodes(lastBlock));
+      // start recovery of the last block for this file
+      long blockRecoveryId = nextGenerationStamp();
+      lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
+      lastBlock.initializeBlockRecovery(blockRecoveryId);
+      leaseManager.renewLease(lease);
+      // Cannot close file right now, since the last block requires recovery.
+      // This may potentially cause infinite loop in lease recovery
+      // if there are no valid replicas on data-nodes.
+      NameNode.stateChangeLog.warn(
+                "DIR* NameSystem.internalReleaseLease: " +
+                "File " + src + " has not been closed." +
+               " Lease recovery is in progress. " +
+                "RecoveryId = " + blockRecoveryId + " for block " + lastBlock);
+      break;
     }
-    // start lease recovery of the last block for this file.
-    pendingFile.assignPrimaryDatanode();
-    leaseManager.renewLease(lease);
+    return false;
+  }
+
+  Lease reassignLease(Lease lease, String src, String newHolder,
+                      INodeFileUnderConstruction pendingFile) {
+    if(newHolder == null)
+      return lease;
+    pendingFile.setClientName(newHolder);
+    return leaseManager.reassignLease(lease, src, newHolder);
   }
 
-  private void finalizeINodeFileUnderConstruction(String src,
+
+  private void finalizeINodeFileUnderConstruction(
+      String src,
       INodeFileUnderConstruction pendingFile) throws IOException {
-    leaseManager.removeLease(pendingFile.clientName, src);
+    leaseManager.removeLease(pendingFile.getClientName(), src);
+
+    // complete the penultimate block
+    blockManager.completeBlock(pendingFile, pendingFile.numBlocks()-2);
 
     // The file is no longer pending.
-    // Create permanent INode, update blockmap
+    // Create permanent INode, update blocks
     INodeFile newFile = pendingFile.convertToInodeFile();
     dir.replaceNode(src, pendingFile, newFile);
 
+    // complete last block of the file
+    blockManager.completeBlock(newFile, newFile.numBlocks()-1);
     // close file and persist block allocations for this file
     dir.closeFile(src, newFile);
 
@@ -1663,30 +2105,35 @@
           + ", closeFile=" + closeFile
           + ", deleteBlock=" + deleteblock
           + ")");
-    final BlockInfo oldblockinfo = blockManager.getStoredBlock(lastblock);
-    if (oldblockinfo == null) {
+    final BlockInfo storedBlock = blockManager.getStoredBlock(lastblock);
+    if (storedBlock == null) {
       throw new IOException("Block (=" + lastblock + ") not found");
     }
-    INodeFile iFile = oldblockinfo.getINode();
-    if (!iFile.isUnderConstruction()) {
+    INodeFile iFile = storedBlock.getINode();
+    if (!iFile.isUnderConstruction() || storedBlock.isComplete()) {
       throw new IOException("Unexpected block (=" + lastblock
           + ") since the file (=" + iFile.getLocalName()
           + ") is not under construction");
     }
-    INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
 
-
-    // Remove old block from blocks map. This always have to be done
-    // because the generation stamp of this block is changing.
-    blockManager.removeBlockFromMap(oldblockinfo);
+    long recoveryId =
+      ((BlockInfoUnderConstruction)storedBlock).getBlockRecoveryId();
+    if(recoveryId != newgenerationstamp) {
+      throw new IOException("The recovery id " + newgenerationstamp
+          + " does not match current recovery id "
+          + recoveryId + " for block " + lastblock); 
+    }
+        
+    INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
 
     if (deleteblock) {
-      pendingFile.removeBlock(lastblock);
+      pendingFile.removeLastBlock(lastblock);
+      blockManager.removeBlockFromMap(storedBlock);
     }
     else {
-      // update last block, construct newblockinfo and add it to the blocks map
-      lastblock.set(lastblock.getBlockId(), newlength, newgenerationstamp);
-      final BlockInfo newblockinfo = blockManager.addINode(lastblock, pendingFile);
+      // update last block
+      storedBlock.setGenerationStamp(newgenerationstamp);
+      storedBlock.setNumBytes(newlength);
 
       // find the DatanodeDescriptor objects
       // There should be no locations in the blockManager till now because the
@@ -1703,13 +2150,11 @@
         // Otherwise fsck will report these blocks as MISSING, especially if the
         // blocksReceived from Datanodes take a long time to arrive.
         for (int i = 0; i < descriptors.length; i++) {
-          descriptors[i].addBlock(newblockinfo);
+          descriptors[i].addBlock(storedBlock);
         }
-        pendingFile.setLastBlock(newblockinfo, null);
-      } else {
-        // add locations into the INodeUnderConstruction
-        pendingFile.setLastBlock(newblockinfo, descriptors);
       }
+      // add pipeline locations into the INodeUnderConstruction
+      pendingFile.setLastBlock(storedBlock, descriptors);
     }
 
     // If this commit does not want to close the file, persist
@@ -1723,7 +2168,10 @@
       LOG.info("commitBlockSynchronization(" + lastblock + ") successful");
       return;
     }
-    
+
+    // commit the last block
+    blockManager.commitLastBlock(pendingFile, storedBlock);
+
     //remove lease, close file
     finalizeINodeFileUnderConstruction(src, pendingFile);
     getEditLog().logSync();
@@ -2352,8 +2800,10 @@
   void chooseExcessReplicates(Collection<DatanodeDescriptor> nonExcess, 
                               Block b, short replication,
                               DatanodeDescriptor addedNode,
-                              DatanodeDescriptor delNodeHint) {
+                              DatanodeDescriptor delNodeHint,
+                              BlockPlacementPolicy replicator) {
     // first form a rack to datanodes map and
+    INodeFile inode = blockManager.getINode(b);
     HashMap<String, ArrayList<DatanodeDescriptor>> rackMap =
       new HashMap<String, ArrayList<DatanodeDescriptor>>();
     for (Iterator<DatanodeDescriptor> iter = nonExcess.iterator();
@@ -2390,24 +2840,13 @@
     boolean firstOne = true;
     while (nonExcess.size() - replication > 0) {
       DatanodeInfo cur = null;
-      long minSpace = Long.MAX_VALUE;
 
       // check if we can del delNodeHint
       if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint) &&
             (priSet.contains(delNodeHint) || (addedNode != null && !priSet.contains(addedNode))) ) {
           cur = delNodeHint;
       } else { // regular excessive replica removal
-        Iterator<DatanodeDescriptor> iter = 
-          priSet.isEmpty() ? remains.iterator() : priSet.iterator();
-          while( iter.hasNext() ) {
-            DatanodeDescriptor node = iter.next();
-            long free = node.getRemaining();
-
-            if (minSpace > free) {
-              minSpace = free;
-              cur = node;
-            }
-          }
+        cur = replicator.chooseReplicaToDelete(inode, b, replication, priSet, remains);
       }
 
       firstOne = false;
@@ -2701,14 +3140,11 @@
     if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
       LOG.info("Start Decommissioning node " + node.getName());
       node.startDecommission();
+      node.decommissioningStatus.setStartTime(now());
       //
       // all the blocks that reside on this node have to be 
       // replicated.
-      Iterator<? extends Block> decommissionBlocks = node.getBlockIterator();
-      while(decommissionBlocks.hasNext()) {
-        Block block = decommissionBlocks.next();
-        blockManager.updateNeededReplications(block, -1, 0);
-      }
+      checkDecommissionStateInternal(node);
     }
   }
 
@@ -2831,7 +3267,7 @@
     // Reread the config to get dfs.hosts and dfs.hosts.exclude filenames.
     // Update the file names and refresh internal includes and excludes list
     if (conf == null)
-      conf = new Configuration();
+      conf = new HdfsConfiguration();
     hostsReader.updateFileNames(conf.get("dfs.hosts",""), 
                                 conf.get("dfs.hosts.exclude", ""));
     hostsReader.refresh();
@@ -2984,9 +3420,10 @@
      * @param conf configuration
      */
     SafeModeInfo(Configuration conf) {
-      this.threshold = conf.getFloat("dfs.safemode.threshold.pct", 0.95f);
-      this.extension = conf.getInt("dfs.safemode.extension", 0);
-      this.safeReplication = conf.getInt("dfs.replication.min", 1);
+      this.threshold = conf.getFloat(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, 0.95f);
+      this.extension = conf.getInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0);
+      this.safeReplication = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, 
+                                         DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
       this.blockTotal = 0; 
       this.blockSafe = 0;
     }
@@ -3329,7 +3766,7 @@
   void setBlockTotal() {
     if (safeMode == null)
       return;
-    safeMode.setBlockTotal((int)getBlocksTotal());
+    safeMode.setBlockTotal((int)getCompleteBlocksTotal());
   }
 
   /**
@@ -3340,6 +3777,33 @@
   }
 
   /**
+   * Get the total number of COMPLETE blocks in the system.
+   * For safe mode only complete blocks are counted.
+   */
+  long getCompleteBlocksTotal() {
+    // Calculate number of blocks under construction
+    long numUCBlocks = 0;
+    for (Lease lease : leaseManager.getSortedLeases()) {
+      for(String path : lease.getPaths()) {
+        INode node = dir.getFileINode(path);
+        assert node != null : "Found a lease for nonexisting file.";
+        assert node.isUnderConstruction() :
+          "Found a lease for file that is not under construction.";
+        INodeFileUnderConstruction cons = (INodeFileUnderConstruction) node;
+        BlockInfo[] blocks = cons.getBlocks();
+        if(blocks == null)
+          continue;
+        for(BlockInfo b : blocks) {
+          if(!b.isComplete())
+            numUCBlocks++;
+        }
+      }
+    }
+    LOG.info("Number of blocks under construction: " + numUCBlocks);
+    return getBlocksTotal() - numUCBlocks;
+  }
+
+  /**
    * Enter safe mode manually.
    * @throws IOException
    */
@@ -3658,29 +4122,124 @@
     return gs;
   }
 
+  private INodeFileUnderConstruction checkUCBlock(Block block, String clientName) 
+  throws IOException {
+    // check safe mode
+    if (isInSafeMode())
+      throw new SafeModeException("Cannot get a new generation stamp and an " +
+                                "access token for block " + block, safeMode);
+    
+    // check stored block state
+    BlockInfo storedBlock = blockManager.getStoredBlock(block);
+    if (storedBlock == null || 
+        storedBlock.getBlockUCState() != BlockUCState.UNDER_CONSTRUCTION) {
+        throw new IOException(block + 
+            " does not exist or is not under Construction" + storedBlock);
+    }
+    
+    // check file inode
+    INodeFile file = storedBlock.getINode();
+    if (file==null || !file.isUnderConstruction()) {
+      throw new IOException("The file " + storedBlock + 
+          " is belonged to does not exist or it is not under construction.");
+    }
+    
+    // check lease
+    INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)file;
+    if (clientName == null || !clientName.equals(pendingFile.getClientName())) {
+      throw new LeaseExpiredException("Lease mismatch: " + block + 
+          " is accessed by a non lease holder " + clientName); 
+    }
+
+    return pendingFile;
+  }
+  
   /**
-   * Verifies that the block is associated with a file that has a lease.
-   * Increments, logs and then returns the stamp
+   * Get a new generation stamp together with an access token for 
+   * a block under construction
+   * 
+   * This method is called for recovering a failed pipeline or setting up
+   * a pipeline to append to a block.
+   * 
+   * @param block a block
+   * @param clientName the name of a client
+   * @return a located block with a new generation stamp and an access token
+   * @throws IOException if any error occurs
+   */
+  synchronized LocatedBlock updateBlockForPipeline(Block block, 
+      String clientName) throws IOException {
+    // check vadility of parameters
+    checkUCBlock(block, clientName);
+    
+    // get a new generation stamp and an access token
+    block.setGenerationStamp(nextGenerationStamp());
+    LocatedBlock locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
+    if (isAccessTokenEnabled) {
+      locatedBlock.setAccessToken(accessTokenHandler.generateToken(
+          block.getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+    }
+    return locatedBlock;
+  }
+  
+  
+  /**
+   * Update a pipeline for a block under construction
+   * 
+   * @param clientName the name of the client
+   * @param oldblock and old block
+   * @param newBlock a new block with a new generation stamp and length
+   * @param newNodes datanodes in the pipeline
+   * @throws IOException if any error occurs
    */
-  synchronized long nextGenerationStampForBlock(Block block) throws IOException {
-    BlockInfo storedBlock = blockManager.getStoredBlock(block);
-    if (storedBlock == null) {
-      String msg = block + " is already commited, storedBlock == null.";
-      LOG.info(msg);
+  synchronized void updatePipeline(String clientName, Block oldBlock, 
+      Block newBlock, DatanodeID[] newNodes)
+      throws IOException {
+    assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and "
+    + oldBlock + " has different block identifier";
+    LOG.info("updatePipeline(block=" + oldBlock
+        + ", newGenerationStamp=" + newBlock.getGenerationStamp()
+        + ", newLength=" + newBlock.getNumBytes()
+        + ", newNodes=" + Arrays.asList(newNodes)
+        + ", clientName=" + clientName
+        + ")");
+
+    // check the vadility of the block and lease holder name
+    final INodeFileUnderConstruction pendingFile = 
+      checkUCBlock(oldBlock, clientName);
+    final BlockInfoUnderConstruction blockinfo = pendingFile.getLastBlock();
+
+    // check new GS & length: this is not expected
+    if (newBlock.getGenerationStamp() <= blockinfo.getGenerationStamp() ||
+        newBlock.getNumBytes() < blockinfo.getNumBytes()) {
+      String msg = "Update " + oldBlock + " (len = " + 
+      blockinfo.getNumBytes() + ") to an older state: " + newBlock + 
+      " (len = " + newBlock.getNumBytes() +")";
+      LOG.warn(msg);
       throw new IOException(msg);
     }
-    INodeFile fileINode = storedBlock.getINode();
-    if (!fileINode.isUnderConstruction()) {
-      String msg = block + " is already commited, !fileINode.isUnderConstruction().";
-      LOG.info(msg);
-      throw new IOException(msg);
+
+    // Update old block with the new generation stamp and new length
+    blockinfo.setGenerationStamp(newBlock.getGenerationStamp());
+    blockinfo.setNumBytes(newBlock.getNumBytes());
+
+    // find the DatanodeDescriptor objects
+    DatanodeDescriptor[] descriptors = null;
+    if (newNodes.length > 0) {
+      descriptors = new DatanodeDescriptor[newNodes.length];
+      for(int i = 0; i < newNodes.length; i++) {
+        descriptors[i] = getDatanode(newNodes[i]);
+      }
     }
-    if (!((INodeFileUnderConstruction)fileINode).setLastRecoveryTime(now())) {
-      String msg = block + " is beening recovered, ignoring this request.";
-      LOG.info(msg);
-      throw new IOException(msg);
+    blockinfo.setExpectedLocations(descriptors);
+
+    // persist blocks only if append is supported
+    String src = leaseManager.findPath(pendingFile);
+    if (supportAppends) {
+      dir.persistBlocks(src, pendingFile);
+      getEditLog().logSync();
     }
-    return nextGenerationStamp();
+    LOG.info("updatePipeline(" + oldBlock + ") successfully to " + newBlock);
+    return;
   }
 
   // rename was successful. If any part of the renamed subtree had
@@ -3783,4 +4342,38 @@
   DatanodeDescriptor getDatanode(String nodeID) {
     return datanodeMap.get(nodeID);
   }
+
+  /**
+   * Return a range of corrupt replica block ids. Up to numExpectedBlocks 
+   * blocks starting at the next block after startingBlockId are returned
+   * (fewer if numExpectedBlocks blocks are unavailable). If startingBlockId 
+   * is null, up to numExpectedBlocks blocks are returned from the beginning.
+   * If startingBlockId cannot be found, null is returned.
+   *
+   * @param numExpectedBlocks Number of block ids to return.
+   *  0 <= numExpectedBlocks <= 100
+   * @param startingBlockId Block id from which to start. If null, start at
+   *  beginning.
+   * @return Up to numExpectedBlocks blocks from startingBlockId if it exists
+   *
+   */
+  long[] getCorruptReplicaBlockIds(int numExpectedBlocks,
+                                   Long startingBlockId) {  
+    return blockManager.getCorruptReplicaBlockIds(numExpectedBlocks,
+                                                  startingBlockId);
+  }
+
+  public synchronized ArrayList<DatanodeDescriptor> getDecommissioningNodes() {
+    ArrayList<DatanodeDescriptor> decommissioningNodes = 
+        new ArrayList<DatanodeDescriptor>();
+    ArrayList<DatanodeDescriptor> results = 
+        getDatanodeListForReport(DatanodeReportType.LIVE);
+    for (Iterator<DatanodeDescriptor> it = results.iterator(); it.hasNext();) {
+      DatanodeDescriptor node = it.next();
+      if (node.isDecommissionInProgress()) {
+        decommissioningNodes.add(node);
+      }
+    }
+    return decommissioningNodes;
+  }  
 }

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java Sat Nov 28 20:05:56 2009
@@ -35,6 +35,8 @@
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UnixUserGroupInformation;
@@ -81,8 +83,8 @@
       final XMLOutputter xml = new XMLOutputter(out, "UTF-8");
       xml.declaration();
 
-      final Configuration conf = new Configuration(DataNode.getDataNode().getConf());
-      final int socketTimeout = conf.getInt("dfs.socket.timeout", HdfsConstants.READ_TIMEOUT);
+      final Configuration conf = new HdfsConfiguration(DataNode.getDataNode().getConf());
+      final int socketTimeout = conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsConstants.READ_TIMEOUT);
       final SocketFactory socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
       UnixUserGroupInformation.saveToConf(conf,
           UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
@@ -99,4 +101,4 @@
       xml.endDocument();
     }
   }
-}
\ No newline at end of file
+}

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java Sat Nov 28 20:05:56 2009
@@ -26,6 +26,7 @@
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -48,7 +49,7 @@
     UserGroupInformation.setCurrentUser(ugi);
 
     final ServletContext context = getServletContext();
-    final Configuration conf = new Configuration((Configuration) context.getAttribute("name.conf"));
+    final Configuration conf = new HdfsConfiguration((Configuration) context.getAttribute("name.conf"));
     UnixUserGroupInformation.saveToConf(conf,
         UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
 

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java Sat Nov 28 20:05:56 2009
@@ -25,15 +25,13 @@
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 
 /**
  * We keep an in-memory representation of the file/block hierarchy.
  * This is a base INode class containing common fields for file and 
  * directory inodes.
  */
-abstract class INode implements Comparable<byte[]> {
+abstract class INode implements Comparable<byte[]>, FSInodeInfo {
   protected byte[] name;
   protected INodeDirectory parent;
   protected long modificationTime;
@@ -247,6 +245,12 @@
   }
 
   /** {@inheritDoc} */
+  public String getFullPathName() {
+    // Get the full path name of this inode.
+    return FSDirectory.getFullPathName(this);
+  }
+
+  /** {@inheritDoc} */
   public String toString() {
     return "\"" + getLocalName() + "\":" + getPermissionStatus();
   }
@@ -417,10 +421,4 @@
     }
     return null;
   }
-  
-  
-  LocatedBlocks createLocatedBlocks(List<LocatedBlock> blocks) {
-    return new LocatedBlocks(computeContentSummary().getLength(), blocks,
-        isUnderConstruction());
-  }
 }

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java Sat Nov 28 20:05:56 2009
@@ -88,7 +88,7 @@
    * @param dsQuota diskspace quota to be set
    *                                
    */
-  void setQuota(long newNsQuota, long newDsQuota) throws QuotaExceededException {
+  void setQuota(long newNsQuota, long newDsQuota) {
     nsQuota = newNsQuota;
     dsQuota = newDsQuota;
   }
@@ -116,18 +116,20 @@
    * 
    * @param nsDelta the change of the tree size
    * @param dsDelta change to disk space occupied
-   * @throws QuotaExceededException if the changed size is greater 
-   *                                than the quota
    */
-  void updateNumItemsInTree(long nsDelta, long dsDelta) throws 
-                            QuotaExceededException {
-    long newCount = nsCount + nsDelta;
-    long newDiskspace = diskspace + dsDelta;
-    if (nsDelta>0 || dsDelta>0) {
-      verifyQuota(nsQuota, newCount, dsQuota, newDiskspace);
-    }
-    nsCount = newCount;
-    diskspace = newDiskspace;
+  void updateNumItemsInTree(long nsDelta, long dsDelta) {
+    nsCount += nsDelta;
+    diskspace += dsDelta;
+  }
+  
+  /** Update the size of the tree
+   * 
+   * @param nsDelta the change of the tree size
+   * @param dsDelta change to disk space occupied
+   **/
+  void unprotectedUpdateNumItemsInTree(long nsDelta, long dsDelta) {
+    nsCount = nsCount + nsDelta;
+    diskspace = diskspace + dsDelta;
   }
   
   /** 
@@ -146,14 +148,16 @@
   /** Verify if the namespace count disk space satisfies the quota restriction 
    * @throws QuotaExceededException if the given quota is less than the count
    */
-  private static void verifyQuota(long nsQuota, long nsCount, 
-                                  long dsQuota, long diskspace)
-                                  throws QuotaExceededException {
-    if (nsQuota >= 0 && nsQuota < nsCount) {
-      throw new NSQuotaExceededException(nsQuota, nsCount);
-    }
-    if (dsQuota >= 0 && dsQuota < diskspace) {
-      throw new DSQuotaExceededException(dsQuota, diskspace);
+  void verifyQuota(long nsDelta, long dsDelta) throws QuotaExceededException {
+    long newCount = nsCount + nsDelta;
+    long newDiskspace = diskspace + dsDelta;
+    if (nsDelta>0 || dsDelta>0) {
+      if (nsQuota >= 0 && nsQuota < newCount) {
+        throw new NSQuotaExceededException(nsQuota, newCount);
+      }
+      if (dsQuota >= 0 && dsQuota < newDiskspace) {
+        throw new DSQuotaExceededException(dsQuota, newDiskspace);
+      }
     }
   }
 }

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Sat Nov 28 20:05:56 2009
@@ -88,6 +88,26 @@
   }
 
   /**
+   * append array of blocks to this.blocks
+   */
+  void appendBlocks(INodeFile [] inodes, int totalAddedBlocks) {
+    int size = this.blocks.length;
+    
+    BlockInfo[] newlist = new BlockInfo[size + totalAddedBlocks];
+    System.arraycopy(this.blocks, 0, newlist, 0, size);
+    
+    for(INodeFile in: inodes) {
+      System.arraycopy(in.blocks, 0, newlist, size, in.blocks.length);
+      size += in.blocks.length;
+    }
+    
+    for(BlockInfo bi: this.blocks) {
+      bi.setINode(this);
+    }
+    this.blocks = newlist;
+  }
+  
+  /**
    * add a block to the block list
    */
   void addBlock(BlockInfo newblock) {
@@ -112,8 +132,11 @@
 
   int collectSubtreeBlocksAndClear(List<Block> v) {
     parent = null;
-    for (Block blk : blocks) {
-      v.add(blk);
+    if(blocks != null && v != null) {
+      for (BlockInfo blk : blocks) {
+        v.add(blk);
+        blk.setINode(null);
+      }
     }
     blocks = null;
     return 1;
@@ -121,16 +144,29 @@
 
   /** {@inheritDoc} */
   long[] computeContentSummary(long[] summary) {
-    long bytes = 0;
-    for(Block blk : blocks) {
-      bytes += blk.getNumBytes();
-    }
-    summary[0] += bytes;
+    summary[0] += computeFileSize(true);
     summary[1]++;
     summary[3] += diskspaceConsumed();
     return summary;
   }
 
+  /** Compute file size.
+   * May or may not include BlockInfoUnderConstruction.
+   */
+  long computeFileSize(boolean includesBlockInfoUnderConstruction) {
+    if (blocks == null || blocks.length == 0) {
+      return 0;
+    }
+    final int last = blocks.length - 1;
+    //check if the last block is BlockInfoUnderConstruction
+    long bytes = blocks[last] instanceof BlockInfoUnderConstruction
+                 && !includesBlockInfoUnderConstruction?
+                     0: blocks[last].getNumBytes();
+    for(int i = 0; i < last; i++) {
+      bytes += blocks[i].getNumBytes();
+    }
+    return bytes;
+  }
   
 
   @Override
@@ -146,6 +182,9 @@
   
   long diskspaceConsumed(Block[] blkArr) {
     long size = 0;
+    if(blkArr == null) 
+      return 0;
+    
     for (Block blk : blkArr) {
       if (blk != null) {
         size += blk.getNumBytes();
@@ -172,22 +211,33 @@
   /**
    * Return the penultimate allocated block for this file.
    */
-  Block getPenultimateBlock() {
+  BlockInfo getPenultimateBlock() {
     if (blocks == null || blocks.length <= 1) {
       return null;
     }
     return blocks[blocks.length - 2];
   }
 
-  INodeFileUnderConstruction toINodeFileUnderConstruction(
-      String clientName, String clientMachine, DatanodeDescriptor clientNode
-      ) throws IOException {
-    if (isUnderConstruction()) {
-      return (INodeFileUnderConstruction)this;
-    }
-    return new INodeFileUnderConstruction(name,
-        blockReplication, modificationTime, preferredBlockSize,
-        blocks, getPermissionStatus(),
-        clientName, clientMachine, clientNode);
+  /**
+   * Get the last block of the file.
+   * Make sure it has the right type.
+   */
+  <T extends BlockInfo> T getLastBlock() throws IOException {
+    if (blocks == null || blocks.length == 0)
+      return null;
+    T returnBlock = null;
+    try {
+      @SuppressWarnings("unchecked")  // ClassCastException is caught below
+      T tBlock = (T)blocks[blocks.length - 1];
+      returnBlock = tBlock;
+    } catch(ClassCastException cce) {
+      throw new IOException("Unexpected last block type: " 
+          + blocks[blocks.length - 1].getClass().getSimpleName());
+    }
+    return returnBlock;
+  }
+
+  int numBlocks() {
+    return blocks == null ? 0 : blocks.length;
   }
 }

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java Sat Nov 28 20:05:56 2009
@@ -21,16 +21,13 @@
 
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 
 
 class INodeFileUnderConstruction extends INodeFile {
-  final String clientName;         // lease holder
+  private  String clientName;         // lease holder
   private final String clientMachine;
   private final DatanodeDescriptor clientNode; // if client is a cluster node too.
-
-  private int primaryNodeIndex = -1; //the node working on lease recovery
-  private DatanodeDescriptor[] targets = null;   //locations for last block
-  private long lastRecoveryTime = 0;
   
   INodeFileUnderConstruction(PermissionStatus permissions,
                              short replication,
@@ -67,6 +64,10 @@
     return clientName;
   }
 
+  void setClientName(String clientName) {
+    this.clientName = clientName;
+  }
+
   String getClientMachine() {
     return clientMachine;
   }
@@ -83,15 +84,6 @@
     return true;
   }
 
-  DatanodeDescriptor[] getTargets() {
-    return targets;
-  }
-
-  void setTargets(DatanodeDescriptor[] targets) {
-    this.targets = targets;
-    this.primaryNodeIndex = -1;
-  }
-
   //
   // converts a INodeFileUnderConstruction into a INodeFile
   // use the modification time as the access time
@@ -108,10 +100,10 @@
   }
 
   /**
-   * remove a block from the block list. This block should be
+   * Remove a block from the block list. This block should be
    * the last one on the list.
    */
-  void removeBlock(Block oldblock) throws IOException {
+  void removeLastBlock(Block oldblock) throws IOException {
     if (blocks == null) {
       throw new IOException("Trying to delete non-existant block " + oldblock);
     }
@@ -124,57 +116,24 @@
     BlockInfo[] newlist = new BlockInfo[size_1];
     System.arraycopy(blocks, 0, newlist, 0, size_1);
     blocks = newlist;
-    
-    // Remove the block locations for the last block.
-    targets = null;
-  }
-
-  synchronized void setLastBlock(BlockInfo newblock, DatanodeDescriptor[] newtargets
-      ) throws IOException {
-    if (blocks == null) {
-      throw new IOException("Trying to update non-existant block (newblock="
-          + newblock + ")");
-    }
-    blocks[blocks.length - 1] = newblock;
-    setTargets(newtargets);
-    lastRecoveryTime = 0;
   }
 
   /**
-   * Initialize lease recovery for this object
+   * Convert the last block of the file to an under-construction block.
+   * Set its locations.
    */
-  void assignPrimaryDatanode() {
-    //assign the first alive datanode as the primary datanode
-
-    if (targets.length == 0) {
-      NameNode.stateChangeLog.warn("BLOCK*"
-        + " INodeFileUnderConstruction.initLeaseRecovery:"
-        + " No blocks found, lease removed.");
-    }
-
-    int previous = primaryNodeIndex;
-    //find an alive datanode beginning from previous
-    for(int i = 1; i <= targets.length; i++) {
-      int j = (previous + i)%targets.length;
-      if (targets[j].isAlive) {
-        DatanodeDescriptor primary = targets[primaryNodeIndex = j]; 
-        primary.addBlockToBeRecovered(blocks[blocks.length - 1], targets);
-        NameNode.stateChangeLog.info("BLOCK* " + blocks[blocks.length - 1]
-          + " recovery started, primary=" + primary);
-        return;
-      }
-    }
-  }
-  
-  /**
-   * Update lastRecoveryTime if expired.
-   * @return true if lastRecoveryTimeis updated. 
-   */
-  synchronized boolean setLastRecoveryTime(long now) {
-    boolean expired = now - lastRecoveryTime > NameNode.LEASE_RECOVER_PERIOD;
-    if (expired) {
-      lastRecoveryTime = now;
-    }
-    return expired;
+  BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock,
+                                          DatanodeDescriptor[] targets)
+  throws IOException {
+    if (blocks == null || blocks.length == 0) {
+      throw new IOException("Trying to update non-existant block. " +
+      		"File is empty.");
+    }
+    BlockInfoUnderConstruction ucBlock =
+      lastBlock.convertToBlockUnderConstruction(
+          BlockUCState.UNDER_CONSTRUCTION, targets);
+    ucBlock.setINode(this);
+    setBlock(numBlocks()-1, ucBlock);
+    return ucBlock;
   }
 }

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java Sat Nov 28 20:05:56 2009
@@ -102,7 +102,7 @@
   /**
    * Adds (or re-adds) the lease for the specified file.
    */
-  synchronized void addLease(String holder, String src) {
+  synchronized Lease addLease(String holder, String src) {
     Lease lease = getLease(holder);
     if (lease == null) {
       lease = new Lease(holder);
@@ -113,6 +113,7 @@
     }
     sortedLeasesByPath.put(src, lease);
     lease.paths.add(src);
+    return lease;
   }
 
   /**
@@ -143,11 +144,22 @@
   }
 
   /**
+   * Reassign lease for file src to the new holder.
+   */
+  synchronized Lease reassignLease(Lease lease, String src, String newHolder) {
+    assert newHolder != null : "new lease holder is null";
+    if (lease != null) {
+      removeLease(lease, src);
+    }
+    return addLease(newHolder, src);
+  }
+
+  /**
    * Finds the pathname for the specified pendingFile
    */
   synchronized String findPath(INodeFileUnderConstruction pendingFile
       ) throws IOException {
-    Lease lease = getLease(pendingFile.clientName);
+    Lease lease = getLease(pendingFile.getClientName());
     if (lease != null) {
       String src = lease.findPath(pendingFile);
       if (src != null) {
@@ -265,7 +277,11 @@
     Collection<String> getPaths() {
       return paths;
     }
-    
+
+    String getHolder() {
+      return holder;
+    }
+
     void replacePath(String oldpath, String newpath) {
       paths.remove(oldpath);
       paths.add(newpath);
@@ -376,7 +392,13 @@
       oldest.getPaths().toArray(leasePaths);
       for(String p : leasePaths) {
         try {
-          fsnamesystem.internalReleaseLease(oldest, p);
+          if(fsnamesystem.internalReleaseLease(oldest, p, "HDFS_NameNode")) {
+            LOG.info("Lease recovery for file " + p +
+                          " is complete. File closed.");
+            removing.add(p);
+          } else
+            LOG.info("Started block recovery for file " + p +
+                          " lease " + oldest);
         } catch (IOException e) {
           LOG.error("Cannot release the path "+p+" in the lease "+oldest, e);
           removing.add(p);



Mime
View raw message