hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077592 [2/3] - in /hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server: datanode/ namenode/
Date Fri, 04 Mar 2011 04:32:58 GMT
Added: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java.orig
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java.orig?rev=1077592&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java.orig (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java.orig Fri Mar  4 04:32:57 2011
@@ -0,0 +1,5158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.commons.logging.*;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
+import org.apache.hadoop.hdfs.server.namenode.BlocksMap.BlockInfo;
+import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
+import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMetrics;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.util.*;
+import org.apache.hadoop.metrics.util.MBeanUtil;
+import org.apache.hadoop.net.CachedDNSToSwitchMapping;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.ScriptBasedMapping;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
+import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
+import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.Server;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.DataOutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.Map.Entry;
+
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
+/***************************************************
+ * FSNamesystem does the actual bookkeeping work for the
+ * DataNode.
+ *
+ * It tracks several important tables.
+ *
+ * 1)  valid fsname --> blocklist  (kept on disk, logged)
+ * 2)  Set of all valid blocks (inverted #1)
+ * 3)  block --> machinelist (kept in memory, rebuilt dynamically from reports)
+ * 4)  machine --> blocklist (inverted #2)
+ * 5)  LRU cache of updated-heartbeat machines
+ ***************************************************/
+public class FSNamesystem implements FSConstants, FSNamesystemMBean {
+  public static final Log LOG = LogFactory.getLog(FSNamesystem.class);
+  public static final String AUDIT_FORMAT =
+    "ugi=%s\t" +  // ugi
+    "ip=%s\t" +   // remote IP
+    "cmd=%s\t" +  // command
+    "src=%s\t" +  // src path
+    "dst=%s\t" +  // dst path (optional)
+    "perm=%s";    // permissions (optional)
+
+  private static final ThreadLocal<Formatter> auditFormatter =
+    new ThreadLocal<Formatter>() {
+      protected Formatter initialValue() {
+        return new Formatter(new StringBuilder(AUDIT_FORMAT.length() * 4));
+      }
+  };
+
+  private static final void logAuditEvent(UserGroupInformation ugi,
+      InetAddress addr, String cmd, String src, String dst,
+      HdfsFileStatus stat) {
+    final Formatter fmt = auditFormatter.get();
+    ((StringBuilder)fmt.out()).setLength(0);
+    auditLog.info(fmt.format(AUDIT_FORMAT, ugi, addr, cmd, src, dst,
+                  (stat == null)
+                    ? null
+                    : stat.getOwner() + ':' + stat.getGroup() + ':' +
+                      stat.getPermission()
+          ).toString());
+
+  }
+
+  public static final Log auditLog = LogFactory.getLog(
+      FSNamesystem.class.getName() + ".audit");
+
+  // Default initial capacity and load factor of map
+  public static final int DEFAULT_INITIAL_MAP_CAPACITY = 16;
+  public static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f;
+
+  private boolean isPermissionEnabled;
+  private UserGroupInformation fsOwner;
+  private String supergroup;
+  private PermissionStatus defaultPermission;
+  // FSNamesystemMetrics counter variables
+  private FSNamesystemMetrics myFSMetrics;
+  private long capacityTotal = 0L, capacityUsed = 0L, capacityRemaining = 0L;
+  private int totalLoad = 0;
+  boolean isAccessTokenEnabled;
+  BlockTokenSecretManager accessTokenHandler;
+  private long accessKeyUpdateInterval;
+  private long accessTokenLifetime;
+  
+  // Scan interval is not configurable.
+  private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL =
+    TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS);
+  private DelegationTokenSecretManager dtSecretManager;
+
+  volatile long pendingReplicationBlocksCount = 0L;
+  volatile long corruptReplicaBlocksCount = 0L;
+  volatile long underReplicatedBlocksCount = 0L;
+  volatile long scheduledReplicationBlocksCount = 0L;
+  volatile long excessBlocksCount = 0L;
+  volatile long pendingDeletionBlocksCount = 0L;
+  //
+  // Stores the correct file name hierarchy
+  //
+  public FSDirectory dir;
+
+  //
+  // Mapping: Block -> { INode, datanodes, self ref } 
+  // Updated only in response to client-sent information.
+  //
+  final BlocksMap blocksMap = new BlocksMap(DEFAULT_INITIAL_MAP_CAPACITY, 
+                                            DEFAULT_MAP_LOAD_FACTOR);
+
+  //
+  // Store blocks-->datanodedescriptor(s) map of corrupt replicas
+  //
+  public CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
+    
+  /**
+   * Stores the datanode -> block map.  
+   * <p>
+   * Done by storing a set of {@link DatanodeDescriptor} objects, sorted by 
+   * storage id. In order to keep the storage map consistent it tracks 
+   * all storages ever registered with the namenode.
+   * A descriptor corresponding to a specific storage id can be
+   * <ul> 
+   * <li>added to the map if it is a new storage id;</li>
+   * <li>updated with a new datanode started as a replacement for the old one 
+   * with the same storage id; and </li>
+   * <li>removed if and only if an existing datanode is restarted to serve a
+   * different storage id.</li>
+   * </ul> <br>
+   * The list of the {@link DatanodeDescriptor}s in the map is checkpointed
+   * in the namespace image file. Only the {@link DatanodeInfo} part is 
+   * persistent, the list of blocks is restored from the datanode block
+   * reports. 
+   * <p>
+   * Mapping: StorageID -> DatanodeDescriptor
+   */
+  NavigableMap<String, DatanodeDescriptor> datanodeMap = 
+    new TreeMap<String, DatanodeDescriptor>();
+
+  //
+  // Keeps a Collection for every named machine containing
+  // blocks that have recently been invalidated and are thought to live
+  // on the machine in question.
+  // Mapping: StorageID -> ArrayList<Block>
+  //
+  private Map<String, Collection<Block>> recentInvalidateSets = 
+    new TreeMap<String, Collection<Block>>();
+
+  //
+  // Keeps a TreeSet for every named node.  Each treeset contains
+  // a list of the blocks that are "extra" at that location.  We'll
+  // eventually remove these extras.
+  // Mapping: StorageID -> TreeSet<Block>
+  //
+  Map<String, Collection<Block>> excessReplicateMap = 
+    new TreeMap<String, Collection<Block>>();
+
+  Random r = new Random();
+
+  /**
+   * Stores a set of DatanodeDescriptor objects.
+   * This is a subset of {@link #datanodeMap}, containing nodes that are 
+   * considered alive.
+   * The {@link HeartbeatMonitor} periodically checks for outdated entries,
+   * and removes them from the list.
+   */
+  ArrayList<DatanodeDescriptor> heartbeats = new ArrayList<DatanodeDescriptor>();
+
+  //
+  // Store set of Blocks that need to be replicated 1 or more times.
+  // We also store pending replication-orders.
+  // Set of: Block
+  //
+  private UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
+  private PendingReplicationBlocks pendingReplications;
+
+  public LeaseManager leaseManager = new LeaseManager(this); 
+
+  //
+  // Threaded object that checks to see if we have been
+  // getting heartbeats from all clients. 
+  //
+  Daemon hbthread = null;   // HeartbeatMonitor thread
+  public Daemon lmthread = null;   // LeaseMonitor thread
+  Daemon smmthread = null;  // SafeModeMonitor thread
+  public Daemon replthread = null;  // Replication thread
+  
+  private volatile boolean fsRunning = true;
+  long systemStart = 0;
+
+  //  The maximum number of replicates we should allow for a single block
+  private int maxReplication;
+  //  How many outgoing replication streams a given node should have at one time
+  private int maxReplicationStreams;
+  // MIN_REPLICATION is how many copies we need in place or else we disallow the write
+  private int minReplication;
+  // Default replication
+  private int defaultReplication;
+  // heartbeatRecheckInterval is how often namenode checks for expired datanodes
+  private long heartbeatRecheckInterval;
+  // heartbeatExpireInterval is how long namenode waits for datanode to report
+  // heartbeat
+  private long heartbeatExpireInterval;
+  //replicationRecheckInterval is how often namenode checks for new replication work
+  private long replicationRecheckInterval;
+  // default block size of a file
+  private long defaultBlockSize = 0;
+  // allow appending to hdfs files
+  private boolean supportAppends = true;
+
+  /**
+   * Last block index used for replication work.
+   */
+  private int replIndex = 0;
+  private long missingBlocksInCurIter = 0;
+  private long missingBlocksInPrevIter = 0; 
+
+  public static FSNamesystem fsNamesystemObject;
+  /** NameNode RPC address */
+  private InetSocketAddress nameNodeAddress = null; // TODO: name-node has this field, it should be removed here
+  private SafeModeInfo safeMode;  // safe mode information
+  private Host2NodesMap host2DataNodeMap = new Host2NodesMap();
+    
+  // datanode networktoplogy
+  NetworkTopology clusterMap = new NetworkTopology();
+  private DNSToSwitchMapping dnsToSwitchMapping;
+  
+  // for block replicas placement
+  ReplicationTargetChooser replicator;
+
+  private HostsFileReader hostsReader; 
+  private Daemon dnthread = null;
+
+  private long maxFsObjects = 0;          // maximum number of fs objects
+
+  /**
+   * The global generation stamp for this file system. 
+   */
+  private final GenerationStamp generationStamp = new GenerationStamp();
+
+  // Ask Datanode only up to this many blocks to delete.
+  private int blockInvalidateLimit = FSConstants.BLOCK_INVALIDATE_CHUNK;
+
+  // precision of access times.
+  private long accessTimePrecision = 0;
+
+  /**
+   * FSNamesystem constructor.
+   */
+  FSNamesystem(NameNode nn, Configuration conf) throws IOException {
+    try {
+      initialize(nn, conf);
+    } catch(IOException e) {
+      LOG.error(getClass().getSimpleName() + " initialization failed.", e);
+      close();
+      throw e;
+    }
+  }
+
+  void activateSecretManager() throws IOException {
+    if (dtSecretManager != null) {
+      dtSecretManager.startThreads();
+    }
+  }
+
+  /**
+   * Initialize FSNamesystem.
+   */
+  private void initialize(NameNode nn, Configuration conf) throws IOException {
+    this.systemStart = now();
+    setConfigurationParameters(conf);
+    dtSecretManager = createDelegationTokenSecretManager(conf);
+
+    this.nameNodeAddress = nn.getNameNodeAddress();
+    this.registerMBean(conf); // register the MBean for the FSNamesystemStutus
+    this.dir = new FSDirectory(this, conf);
+    StartupOption startOpt = NameNode.getStartupOption(conf);
+    this.dir.loadFSImage(getNamespaceDirs(conf),
+                         getNamespaceEditsDirs(conf), startOpt);
+    long timeTakenToLoadFSImage = now() - systemStart;
+    LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
+    NameNode.getNameNodeMetrics().fsImageLoadTime.set(
+                              (int) timeTakenToLoadFSImage);
+    this.safeMode = new SafeModeInfo(conf);
+    setBlockTotal();
+    pendingReplications = new PendingReplicationBlocks(
+                            conf.getInt("dfs.replication.pending.timeout.sec", 
+                                        -1) * 1000L);
+    if (isAccessTokenEnabled) {
+      accessTokenHandler = new BlockTokenSecretManager(true,
+          accessKeyUpdateInterval, accessTokenLifetime);
+    }
+    this.hbthread = new Daemon(new HeartbeatMonitor());
+    this.lmthread = new Daemon(leaseManager.new Monitor());
+    this.replthread = new Daemon(new ReplicationMonitor());
+    hbthread.start();
+    lmthread.start();
+    replthread.start();
+
+    this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""),
+                                           conf.get("dfs.hosts.exclude",""));
+    this.dnthread = new Daemon(new DecommissionManager(this).new Monitor(
+        conf.getInt("dfs.namenode.decommission.interval", 30),
+        conf.getInt("dfs.namenode.decommission.nodes.per.interval", 5)));
+    dnthread.start();
+
+    this.dnsToSwitchMapping = ReflectionUtils.newInstance(
+        conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
+            DNSToSwitchMapping.class), conf);
+    
+    /* If the dns to swith mapping supports cache, resolve network 
+     * locations of those hosts in the include list, 
+     * and store the mapping in the cache; so future calls to resolve
+     * will be fast.
+     */
+    if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
+      dnsToSwitchMapping.resolve(new ArrayList<String>(hostsReader.getHosts()));
+    }
+  }
+
+  public static Collection<File> getNamespaceDirs(Configuration conf) {
+    Collection<String> dirNames = conf.getStringCollection("dfs.name.dir");
+    if (dirNames.isEmpty())
+      dirNames.add("/tmp/hadoop/dfs/name");
+    Collection<File> dirs = new ArrayList<File>(dirNames.size());
+    for(String name : dirNames) {
+      dirs.add(new File(name));
+    }
+    return dirs;
+  }
+  
+  public static Collection<File> getNamespaceEditsDirs(Configuration conf) {
+    Collection<String> editsDirNames = 
+            conf.getStringCollection("dfs.name.edits.dir");
+    if (editsDirNames.isEmpty())
+      editsDirNames.add("/tmp/hadoop/dfs/name");
+    Collection<File> dirs = new ArrayList<File>(editsDirNames.size());
+    for(String name : editsDirNames) {
+      dirs.add(new File(name));
+    }
+    return dirs;
+  }
+
+  /**
+   * dirs is a list of directories where the filesystem directory state 
+   * is stored
+   */
+  FSNamesystem(FSImage fsImage, Configuration conf) throws IOException {
+    setConfigurationParameters(conf);
+    this.dir = new FSDirectory(fsImage, this, conf);
+    dtSecretManager = createDelegationTokenSecretManager(conf);
+  }
+
+  /**
+   * Initializes some of the members from configuration
+   */
+  private void setConfigurationParameters(Configuration conf) 
+                                          throws IOException {
+    fsNamesystemObject = this;
+    fsOwner = UserGroupInformation.getCurrentUser();
+    LOG.info("fsOwner=" + fsOwner);
+
+    this.supergroup = conf.get("dfs.permissions.supergroup", "supergroup");
+    this.isPermissionEnabled = conf.getBoolean("dfs.permissions", true);
+    LOG.info("supergroup=" + supergroup);
+    LOG.info("isPermissionEnabled=" + isPermissionEnabled);
+    short filePermission = (short)conf.getInt("dfs.upgrade.permission", 0777);
+    this.defaultPermission = PermissionStatus.createImmutable(
+        fsOwner.getShortUserName(), supergroup, new FsPermission(filePermission));
+
+
+    this.replicator = new ReplicationTargetChooser(
+                         conf.getBoolean("dfs.replication.considerLoad", true),
+                         this,
+                         clusterMap);
+    this.defaultReplication = conf.getInt("dfs.replication", 3);
+    this.maxReplication = conf.getInt("dfs.replication.max", 512);
+    this.minReplication = conf.getInt("dfs.replication.min", 1);
+    if (minReplication <= 0)
+      throw new IOException(
+                            "Unexpected configuration parameters: dfs.replication.min = " 
+                            + minReplication
+                            + " must be greater than 0");
+    if (maxReplication >= (int)Short.MAX_VALUE)
+      throw new IOException(
+                            "Unexpected configuration parameters: dfs.replication.max = " 
+                            + maxReplication + " must be less than " + (Short.MAX_VALUE));
+    if (maxReplication < minReplication)
+      throw new IOException(
+                            "Unexpected configuration parameters: dfs.replication.min = " 
+                            + minReplication
+                            + " must be less than dfs.replication.max = " 
+                            + maxReplication);
+    this.maxReplicationStreams = conf.getInt("dfs.max-repl-streams", 2);
+    long heartbeatInterval = conf.getLong("dfs.heartbeat.interval", 3) * 1000;
+    this.heartbeatRecheckInterval = conf.getInt(
+        "heartbeat.recheck.interval", 5 * 60 * 1000); // 5 minutes
+    this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval +
+      10 * heartbeatInterval;
+    this.replicationRecheckInterval = 
+      conf.getInt("dfs.replication.interval", 3) * 1000L;
+    this.defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+    this.maxFsObjects = conf.getLong("dfs.max.objects", 0);
+    this.blockInvalidateLimit = Math.max(this.blockInvalidateLimit, 
+                                         20*(int)(heartbeatInterval/1000));
+    this.accessTimePrecision = conf.getLong("dfs.access.time.precision", 0);
+    this.supportAppends = conf.getBoolean("dfs.support.append", false);
+    this.isAccessTokenEnabled = conf.getBoolean(
+        DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, false);
+    if (isAccessTokenEnabled) {
+      this.accessKeyUpdateInterval = conf.getLong(
+          DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_KEY, 600) * 60 * 1000L; // 10 hrs
+      this.accessTokenLifetime = conf.getLong(
+          DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY, 600) * 60 * 1000L; // 10 hrs
+    }
+    LOG.info("isAccessTokenEnabled=" + isAccessTokenEnabled
+        + " accessKeyUpdateInterval=" + accessKeyUpdateInterval / (60 * 1000)
+        + " min(s), accessTokenLifetime=" + accessTokenLifetime / (60 * 1000)
+        + " min(s)");
+  }
+
+  /**
+   * Return the default path permission when upgrading from releases with no
+   * permissions (<=0.15) to releases with permissions (>=0.16)
+   */
+  protected PermissionStatus getUpgradePermission() {
+    return defaultPermission;
+  }
+  
+  /** Return the FSNamesystem object
+   * 
+   */
+  public static FSNamesystem getFSNamesystem() {
+    return fsNamesystemObject;
+  } 
+
+  NamespaceInfo getNamespaceInfo() {
+    return new NamespaceInfo(dir.fsImage.getNamespaceID(),
+                             dir.fsImage.getCTime(),
+                             getDistributedUpgradeVersion());
+  }
+
+  /**
+   * Close down this file system manager.
+   * Causes heartbeat and lease daemons to stop; waits briefly for
+   * them to finish, but a short timeout returns control back to caller.
+   */
+  public void close() {
+    fsRunning = false;
+    try {
+      if (pendingReplications != null) pendingReplications.stop();
+      if (hbthread != null) hbthread.interrupt();
+      if (replthread != null) replthread.interrupt();
+      if (dnthread != null) dnthread.interrupt();
+      if (smmthread != null) smmthread.interrupt();
+      if (dtSecretManager != null) dtSecretManager.stopThreads();
+    } catch (Exception e) {
+      LOG.warn("Exception shutting down FSNamesystem", e);
+    } finally {
+      // using finally to ensure we also wait for lease daemon
+      try {
+        if (lmthread != null) {
+          lmthread.interrupt();
+          lmthread.join(3000);
+        }
+        dir.close();
+        blocksMap.close();
+      } catch (InterruptedException ie) {
+      } catch (IOException ie) {
+        LOG.error("Error closing FSDirectory", ie);
+        IOUtils.cleanup(LOG, dir);
+      }
+    }
+  }
+
+  /** Is this name system running? */
+  boolean isRunning() {
+    return fsRunning;
+  }
+
+  /**
+   * Dump all metadata into specified file
+   */
+  synchronized void metaSave(String filename) throws IOException {
+    checkSuperuserPrivilege();
+    File file = new File(System.getProperty("hadoop.log.dir"), 
+                         filename);
+    PrintWriter out = new PrintWriter(new BufferedWriter(
+                                                         new FileWriter(file, true)));
+ 
+    long totalInodes = this.dir.totalInodes();
+    long totalBlocks = this.getBlocksTotal();
+
+    ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
+    ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
+    this.DFSNodesStatus(live, dead);
+    
+    String str = totalInodes + " files and directories, " + totalBlocks
+        + " blocks = " + (totalInodes + totalBlocks) + " total";
+    out.println(str);
+    out.println("Live Datanodes: "+live.size());
+    out.println("Dead Datanodes: "+dead.size());
+
+    //
+    // Dump contents of neededReplication
+    //
+    synchronized (neededReplications) {
+      out.println("Metasave: Blocks waiting for replication: " + 
+                  neededReplications.size());
+      for (Block block : neededReplications) {
+        List<DatanodeDescriptor> containingNodes =
+                                          new ArrayList<DatanodeDescriptor>();
+        NumberReplicas numReplicas = new NumberReplicas();
+        // source node returned is not used
+        chooseSourceDatanode(block, containingNodes, numReplicas);
+        int usableReplicas = numReplicas.liveReplicas() + 
+                             numReplicas.decommissionedReplicas(); 
+
+        if (block instanceof BlockInfo) {
+          String fileName = FSDirectory.getFullPathName(((BlockInfo) block)
+              .getINode());
+          out.print(fileName + ": ");
+        }
+
+        // l: == live:, d: == decommissioned c: == corrupt e: == excess
+        out.print(block + ((usableReplicas > 0)? "" : " MISSING") +
+                  " (replicas:" +
+                  " l: " + numReplicas.liveReplicas() + 
+                  " d: " + numReplicas.decommissionedReplicas() + 
+                  " c: " + numReplicas.corruptReplicas() + 
+                  " e: " + numReplicas.excessReplicas() + ") ");
+
+        Collection<DatanodeDescriptor> corruptNodes =
+                                       corruptReplicas.getNodes(block);
+
+        for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
+             jt.hasNext();) {
+          DatanodeDescriptor node = jt.next();
+          String state = "";
+          if (corruptNodes != null && corruptNodes.contains(node)) {
+            state = "(corrupt)";
+          } else if (node.isDecommissioned() ||
+                     node.isDecommissionInProgress()) {
+            state = "(decommissioned)";
+          }
+          out.print(" " + node + state + " : ");
+        }
+        out.println("");
+      }
+    }
+
+    //
+    // Dump blocks from pendingReplication
+    //
+    pendingReplications.metaSave(out);
+
+    //
+    // Dump blocks that are waiting to be deleted
+    //
+    dumpRecentInvalidateSets(out);
+
+    //
+    // Dump all datanodes
+    //
+    datanodeDump(out);
+
+    out.flush();
+    out.close();
+  }
+
+  long getDefaultBlockSize() {
+    return defaultBlockSize;
+  }
+
+  long getAccessTimePrecision() {
+    return accessTimePrecision;
+  }
+
+  private boolean isAccessTimeSupported() {
+    return accessTimePrecision > 0;
+  }
+    
+  /* get replication factor of a block */
+  private int getReplication(Block block) {
+    INodeFile fileINode = blocksMap.getINode(block);
+    if (fileINode == null) { // block does not belong to any file
+      return 0;
+    }
+    assert !fileINode.isDirectory() : "Block cannot belong to a directory.";
+    return fileINode.getReplication();
+  }
+
+  /* updates a block in under replication queue */
+  synchronized void updateNeededReplications(Block block,
+                        int curReplicasDelta, int expectedReplicasDelta) {
+    NumberReplicas repl = countNodes(block);
+    int curExpectedReplicas = getReplication(block);
+    neededReplications.update(block, 
+                              repl.liveReplicas(), 
+                              repl.decommissionedReplicas(),
+                              curExpectedReplicas,
+                              curReplicasDelta, expectedReplicasDelta);
+  }
+
+  /////////////////////////////////////////////////////////
+  //
+  // These methods are called by secondary namenodes
+  //
+  /////////////////////////////////////////////////////////
+  /**
+   * return a list of blocks & their locations on <code>datanode</code> whose
+   * total size is <code>size</code>
+   * 
+   * @param datanode on which blocks are located
+   * @param size total size of blocks
+   */
+  synchronized BlocksWithLocations getBlocks(DatanodeID datanode, long size)
+      throws IOException {
+    checkSuperuserPrivilege();
+
+    DatanodeDescriptor node = getDatanode(datanode);
+    if (node == null) {
+      NameNode.stateChangeLog.warn("BLOCK* NameSystem.getBlocks: "
+          + "Asking for blocks from an unrecorded node " + datanode.getName());
+      throw new IllegalArgumentException(
+          "Unexpected exception.  Got getBlocks message for datanode " + 
+          datanode.getName() + ", but there is no info for it");
+    }
+
+    int numBlocks = node.numBlocks();
+    if(numBlocks == 0) {
+      return new BlocksWithLocations(new BlockWithLocations[0]);
+    }
+    Iterator<Block> iter = node.getBlockIterator();
+    int startBlock = r.nextInt(numBlocks); // starting from a random block
+    // skip blocks
+    for(int i=0; i<startBlock; i++) {
+      iter.next();
+    }
+    List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
+    long totalSize = 0;
+    while(totalSize<size && iter.hasNext()) {
+      totalSize += addBlock(iter.next(), results);
+    }
+    if(totalSize<size) {
+      iter = node.getBlockIterator(); // start from the beginning
+      for(int i=0; i<startBlock&&totalSize<size; i++) {
+        totalSize += addBlock(iter.next(), results);
+      }
+    }
+    
+    return new BlocksWithLocations(
+        results.toArray(new BlockWithLocations[results.size()]));
+  }
+  
+  /**
+   * Get access keys
+   * 
+   * @return current access keys
+   */
+  ExportedBlockKeys getBlockKeys() {
+    return isAccessTokenEnabled ? accessTokenHandler.exportKeys()
+        : ExportedBlockKeys.DUMMY_KEYS;
+  }
+
+  /**
+   * Get all valid locations of the block & add the block to results
+   * return the length of the added block; 0 if the block is not added
+   */
+  private long addBlock(Block block, List<BlockWithLocations> results) {
+    ArrayList<String> machineSet =
+      new ArrayList<String>(blocksMap.numNodes(block));
+    for(Iterator<DatanodeDescriptor> it = 
+      blocksMap.nodeIterator(block); it.hasNext();) {
+      String storageID = it.next().getStorageID();
+      // filter invalidate replicas
+      Collection<Block> blocks = recentInvalidateSets.get(storageID); 
+      if(blocks==null || !blocks.contains(block)) {
+        machineSet.add(storageID);
+      }
+    }
+    if(machineSet.size() == 0) {
+      return 0;
+    } else {
+      results.add(new BlockWithLocations(block, 
+          machineSet.toArray(new String[machineSet.size()])));
+      return block.getNumBytes();
+    }
+  }
+
+  /////////////////////////////////////////////////////////
+  //
+  // These methods are called by HadoopFS clients
+  //
+  /////////////////////////////////////////////////////////
+  /**
+   * Set permissions for an existing file.
+   * @throws IOException
+   */
+  public synchronized void setPermission(String src, FsPermission permission
+      ) throws IOException {
+    if (isInSafeMode())
+       throw new SafeModeException("Cannot set permission for " + src, safeMode);
+    checkOwner(src);
+    dir.setPermission(src, permission);
+    getEditLog().logSync();
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      final HdfsFileStatus stat = dir.getFileInfo(src);
+      logAuditEvent(UserGroupInformation.getCurrentUser(),
+                    Server.getRemoteIp(),
+                    "setPermission", src, null, stat);
+    }
+  }
+
+  /**
+   * Set owner for an existing file.
+   * @throws IOException
+   */
+  public synchronized void setOwner(String src, String username, String group
+      ) throws IOException {
+    if (isInSafeMode())
+       throw new SafeModeException("Cannot set owner for " + src, safeMode);
+    FSPermissionChecker pc = checkOwner(src);
+    if (!pc.isSuper) {
+      if (username != null && !pc.user.equals(username)) {
+        throw new AccessControlException("Non-super user cannot change owner.");
+      }
+      if (group != null && !pc.containsGroup(group)) {
+        throw new AccessControlException("User does not belong to " + group
+            + " .");
+      }
+    }
+    dir.setOwner(src, username, group);
+    getEditLog().logSync();
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      final HdfsFileStatus stat = dir.getFileInfo(src);
+      logAuditEvent(UserGroupInformation.getCurrentUser(),
+                    Server.getRemoteIp(),
+                    "setOwner", src, null, stat);
+    }
+  }
+
+  /**
+   * Get block locations within the specified range.
+   * 
+   * @see #getBlockLocations(String, long, long)
+   */
+  LocatedBlocks getBlockLocations(String clientMachine, String src,
+      long offset, long length) throws IOException {
+    LocatedBlocks blocks = getBlockLocations(src, offset, length, true, true);
+    if (blocks != null) {
+      //sort the blocks
+      DatanodeDescriptor client = host2DataNodeMap.getDatanodeByHost(
+          clientMachine);
+      for (LocatedBlock b : blocks.getLocatedBlocks()) {
+        clusterMap.pseudoSortByDistance(client, b.getLocations());
+      }
+    }
+    return blocks;
+  }
+
+  /**
+   * Get block locations within the specified range.
+   * @see ClientProtocol#getBlockLocations(String, long, long)
+   */
+  public LocatedBlocks getBlockLocations(String src, long offset, long length
+      ) throws IOException {
+    return getBlockLocations(src, offset, length, false, true);
+  }
+
+  /**
+   * Get block locations within the specified range.
+   * @see ClientProtocol#getBlockLocations(String, long, long)
+   */
+  public LocatedBlocks getBlockLocations(String src, long offset, long length,
+      boolean doAccessTime, boolean needBlockToken) throws IOException {
+    if (isPermissionEnabled) {
+      checkPathAccess(src, FsAction.READ);
+    }
+
+    if (offset < 0) {
+      throw new IOException("Negative offset is not supported. File: " + src );
+    }
+    if (length < 0) {
+      throw new IOException("Negative length is not supported. File: " + src );
+    }
+    final LocatedBlocks ret = getBlockLocationsInternal(src, 
+        offset, length, Integer.MAX_VALUE, doAccessTime, needBlockToken);  
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      logAuditEvent(UserGroupInformation.getCurrentUser(),
+                    Server.getRemoteIp(),
+                    "open", src, null, null);
+    }
+    return ret;
+  }
+
+  private synchronized LocatedBlocks getBlockLocationsInternal(String src,
+                                                       long offset, 
+                                                       long length,
+                                                       int nrBlocksToReturn,
+                                                       boolean doAccessTime, 
+                                                       boolean needBlockToken)
+                                                       throws IOException {
+    INodeFile inode = dir.getFileINode(src);
+    if(inode == null) {
+      return null;
+    }
+    if (doAccessTime && isAccessTimeSupported()) {
+      dir.setTimes(src, inode, -1, now(), false);
+    }
+    Block[] blocks = inode.getBlocks();
+    if (blocks == null) {
+      return null;
+    }
+    if (blocks.length == 0) {
+      return inode.createLocatedBlocks(new ArrayList<LocatedBlock>(blocks.length));
+    }
+    List<LocatedBlock> results;
+    results = new ArrayList<LocatedBlock>(blocks.length);
+
+    int curBlk = 0;
+    long curPos = 0, blkSize = 0;
+    int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
+    for (curBlk = 0; curBlk < nrBlocks; curBlk++) {
+      blkSize = blocks[curBlk].getNumBytes();
+      assert blkSize > 0 : "Block of size 0";
+      if (curPos + blkSize > offset) {
+        break;
+      }
+      curPos += blkSize;
+    }
+    
+    if (nrBlocks > 0 && curBlk == nrBlocks)   // offset >= end of file
+      return null;
+    
+    long endOff = offset + length;
+    
+    do {
+      // get block locations
+      int numNodes = blocksMap.numNodes(blocks[curBlk]);
+      int numCorruptNodes = countNodes(blocks[curBlk]).corruptReplicas();
+      int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blocks[curBlk]); 
+      if (numCorruptNodes != numCorruptReplicas) {
+        LOG.warn("Inconsistent number of corrupt replicas for " + 
+            blocks[curBlk] + "blockMap has " + numCorruptNodes + 
+            " but corrupt replicas map has " + numCorruptReplicas);
+      }
+      boolean blockCorrupt = (numCorruptNodes == numNodes);
+      int numMachineSet = blockCorrupt ? numNodes : 
+                            (numNodes - numCorruptNodes);
+      DatanodeDescriptor[] machineSet = new DatanodeDescriptor[numMachineSet];
+      if (numMachineSet > 0) {
+        numNodes = 0;
+        for(Iterator<DatanodeDescriptor> it = 
+            blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) {
+          DatanodeDescriptor dn = it.next();
+          boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blocks[curBlk], dn);
+          if (blockCorrupt || (!blockCorrupt && !replicaCorrupt))
+            machineSet[numNodes++] = dn;
+        }
+      }
+      LocatedBlock b = new LocatedBlock(blocks[curBlk], machineSet, curPos,
+          blockCorrupt);
+
+      results.add(b); 
+      curPos += blocks[curBlk].getNumBytes();
+      curBlk++;
+    } while (curPos < endOff 
+          && curBlk < blocks.length 
+          && results.size() < nrBlocksToReturn);
+    
+    if(isAccessTokenEnabled && needBlockToken) {
+      // Generate a list of the blockIds to be returned for this request
+      long [] blockIds = new long[results.size()];
+      for(int i = 0; i < results.size(); i++) {
+        blockIds[i] = results.get(i).getBlock().getBlockId();
+      }
+      
+      // Generate a single BlockTokenIdentifier for all ids
+      Token<BlockTokenIdentifier> bti 
+        = accessTokenHandler.generateToken(blockIds, 
+            EnumSet.of(BlockTokenSecretManager.AccessMode.READ));
+      
+      // Assign a reference to this BlockTokenIdentifier to all blocks
+      for(LocatedBlock lb : results) {
+        lb.setBlockToken(bti);
+      }
+    }
+    
+    return inode.createLocatedBlocks(results);
+  }
+
+  /**
+   * stores the modification and access time for this inode. 
+   * The access time is precise upto an hour. The transaction, if needed, is
+   * written to the edits log but is not flushed.
+   */
+  public synchronized void setTimes(String src, long mtime, long atime) throws IOException {
+    if (!isAccessTimeSupported() && atime != -1) {
+      throw new IOException("Access time for hdfs is not configured. " +
+                            " Please set dfs.support.accessTime configuration parameter.");
+    }
+    //
+    // The caller needs to have write access to set access & modification times.
+    if (isPermissionEnabled) {
+      checkPathAccess(src, FsAction.WRITE);
+    }
+    INodeFile inode = dir.getFileINode(src);
+    if (inode != null) {
+      dir.setTimes(src, inode, mtime, atime, true);
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        final HdfsFileStatus stat = dir.getFileInfo(src);
+        logAuditEvent(UserGroupInformation.getCurrentUser(),
+                      Server.getRemoteIp(),
+                      "setTimes", src, null, stat);
+      }
+    } else {
+      throw new FileNotFoundException("File " + src + " does not exist.");
+    }
+  }
+
+  /**
+   * Set replication for an existing file.
+   * 
+   * The NameNode sets new replication and schedules either replication of 
+   * under-replicated data blocks or removal of the eccessive block copies 
+   * if the blocks are over-replicated.
+   * 
+   * @see ClientProtocol#setReplication(String, short)
+   * @param src file name
+   * @param replication new replication
+   * @return true if successful; 
+   *         false if file does not exist or is a directory
+   */
+  public boolean setReplication(String src, short replication) 
+                                throws IOException {
+    boolean status = setReplicationInternal(src, replication);
+    getEditLog().logSync();
+    if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
+      logAuditEvent(UserGroupInformation.getCurrentUser(),
+                    Server.getRemoteIp(),
+                    "setReplication", src, null, null);
+    }
+    return status;
+  }
+
+  private synchronized boolean setReplicationInternal(String src, 
+                                             short replication
+                                             ) throws IOException {
+    if (isInSafeMode())
+      throw new SafeModeException("Cannot set replication for " + src, safeMode);
+    verifyReplication(src, replication, null);
+    if (isPermissionEnabled) {
+      checkPathAccess(src, FsAction.WRITE);
+    }
+
+    int[] oldReplication = new int[1];
+    Block[] fileBlocks;
+    fileBlocks = dir.setReplication(src, replication, oldReplication);
+    if (fileBlocks == null)  // file not found or is a directory
+      return false;
+    int oldRepl = oldReplication[0];
+    if (oldRepl == replication) // the same replication
+      return true;
+
+    // update needReplication priority queues
+    for(int idx = 0; idx < fileBlocks.length; idx++)
+      updateNeededReplications(fileBlocks[idx], 0, replication-oldRepl);
+      
+    if (oldRepl > replication) {  
+      // old replication > the new one; need to remove copies
+      LOG.info("Reducing replication for file " + src 
+               + ". New replication is " + replication);
+      for(int idx = 0; idx < fileBlocks.length; idx++)
+        processOverReplicatedBlock(fileBlocks[idx], replication, null, null);
+    } else { // replication factor is increased
+      LOG.info("Increasing replication for file " + src 
+          + ". New replication is " + replication);
+    }
+    return true;
+  }
+    
+  long getPreferredBlockSize(String filename) throws IOException {
+    if (isPermissionEnabled) {
+      checkTraverse(filename);
+    }
+    return dir.getPreferredBlockSize(filename);
+  }
+    
+  /**
+   * Check whether the replication parameter is within the range
+   * determined by system configuration.
+   */
+  private void verifyReplication(String src, 
+                                 short replication, 
+                                 String clientName 
+                                 ) throws IOException {
+    String text = "file " + src 
+      + ((clientName != null) ? " on client " + clientName : "")
+      + ".\n"
+      + "Requested replication " + replication;
+
+    if (replication > maxReplication)
+      throw new IOException(text + " exceeds maximum " + maxReplication);
+      
+    if (replication < minReplication)
+      throw new IOException( 
+                            text + " is less than the required minimum " + minReplication);
+  }
+
+  /**
+   * Create a new file entry in the namespace.
+   * 
+   * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
+   * 
+   * @throws IOException if file name is invalid
+   *         {@link FSDirectory#isValidToCreate(String)}.
+   */
+  void startFile(String src, PermissionStatus permissions,
+                 String holder, String clientMachine,
+                 boolean overwrite, short replication, long blockSize
+                ) throws IOException {
+    startFileInternal(src, permissions, holder, clientMachine, overwrite, false,
+                      replication, blockSize);
+    getEditLog().logSync();
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      final HdfsFileStatus stat = dir.getFileInfo(src);
+      logAuditEvent(UserGroupInformation.getCurrentUser(),
+                    Server.getRemoteIp(),
+                    "create", src, null, stat);
+    }
+  }
+
+  private synchronized void startFileInternal(String src,
+                                              PermissionStatus permissions,
+                                              String holder, 
+                                              String clientMachine, 
+                                              boolean overwrite,
+                                              boolean append,
+                                              short replication,
+                                              long blockSize
+                                              ) throws IOException {
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: src=" + src
+          + ", holder=" + holder
+          + ", clientMachine=" + clientMachine
+          + ", replication=" + replication
+          + ", overwrite=" + overwrite
+          + ", append=" + append);
+    }
+
+    if (isInSafeMode())
+      throw new SafeModeException("Cannot create file" + src, safeMode);
+    if (!DFSUtil.isValidName(src)) {
+      throw new IOException("Invalid file name: " + src);
+    }
+
+    // Verify that the destination does not exist as a directory already.
+    boolean pathExists = dir.exists(src);
+    if (pathExists && dir.isDir(src)) {
+      throw new IOException("Cannot create file "+ src + "; already exists as a directory.");
+    }
+
+    if (isPermissionEnabled) {
+      if (append || (overwrite && pathExists)) {
+        checkPathAccess(src, FsAction.WRITE);
+      }
+      else {
+        checkAncestorAccess(src, FsAction.WRITE);
+      }
+    }
+
+    try {
+      INode myFile = dir.getFileINode(src);
+      if (myFile != null && myFile.isUnderConstruction()) {
+        INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) myFile;
+        //
+        // If the file is under construction , then it must be in our
+        // leases. Find the appropriate lease record.
+        //
+        Lease lease = leaseManager.getLease(holder);
+        //
+        // We found the lease for this file. And surprisingly the original
+        // holder is trying to recreate this file. This should never occur.
+        //
+        if (lease != null) {
+          throw new AlreadyBeingCreatedException(
+                                                 "failed to create file " + src + " for " + holder +
+                                                 " on client " + clientMachine + 
+                                                 " because current leaseholder is trying to recreate file.");
+        }
+        //
+        // Find the original holder.
+        //
+        lease = leaseManager.getLease(pendingFile.clientName);
+        if (lease == null) {
+          throw new AlreadyBeingCreatedException(
+                                                 "failed to create file " + src + " for " + holder +
+                                                 " on client " + clientMachine + 
+                                                 " because pendingCreates is non-null but no leases found.");
+        }
+        //
+        // If the original holder has not renewed in the last SOFTLIMIT 
+        // period, then start lease recovery.
+        //
+        if (lease.expiredSoftLimit()) {
+          LOG.info("startFile: recover lease " + lease + ", src=" + src);
+          internalReleaseLease(lease, src);
+        }
+        throw new AlreadyBeingCreatedException("failed to create file " + src + " for " + holder +
+                                               " on client " + clientMachine + 
+                                               ", because this file is already being created by " +
+                                               pendingFile.getClientName() + 
+                                               " on " + pendingFile.getClientMachine());
+      }
+
+      try {
+        verifyReplication(src, replication, clientMachine);
+      } catch(IOException e) {
+        throw new IOException("failed to create "+e.getMessage());
+      }
+      if (append) {
+        if (myFile == null) {
+          throw new FileNotFoundException("failed to append to non-existent file "
+              + src + " on client " + clientMachine);
+        } else if (myFile.isDirectory()) {
+          throw new IOException("failed to append to directory " + src 
+                                +" on client " + clientMachine);
+        }
+      } else if (!dir.isValidToCreate(src)) {
+        if (overwrite) {
+          delete(src, true);
+        } else {
+          throw new IOException("failed to create file " + src 
+                                +" on client " + clientMachine
+                                +" either because the filename is invalid or the file exists");
+        }
+      }
+
+      DatanodeDescriptor clientNode = 
+        host2DataNodeMap.getDatanodeByHost(clientMachine);
+
+      if (append) {
+        //
+        // Replace current node with a INodeUnderConstruction.
+        // Recreate in-memory lease record.
+        //
+        INodeFile node = (INodeFile) myFile;
+        INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
+                                        node.getLocalNameBytes(),
+                                        node.getReplication(),
+                                        node.getModificationTime(),
+                                        node.getPreferredBlockSize(),
+                                        node.getBlocks(),
+                                        node.getPermissionStatus(),
+                                        holder,
+                                        clientMachine,
+                                        clientNode);
+        dir.replaceNode(src, node, cons);
+        leaseManager.addLease(cons.clientName, src);
+
+      } else {
+       // Now we can add the name to the filesystem. This file has no
+       // blocks associated with it.
+       //
+       checkFsObjectLimit();
+
+        // increment global generation stamp
+        long genstamp = nextGenerationStamp();
+        INodeFileUnderConstruction newNode = dir.addFile(src, permissions,
+            replication, blockSize, holder, clientMachine, clientNode, genstamp);
+        if (newNode == null) {
+          throw new IOException("DIR* NameSystem.startFile: " +
+                                "Unable to add file to namespace.");
+        }
+        leaseManager.addLease(newNode.clientName, src);
+        if (NameNode.stateChangeLog.isDebugEnabled()) {
+          NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
+                                     +"add "+src+" to namespace for "+holder);
+        }
+      }
+    } catch (IOException ie) {
+      NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: "
+                                   +ie.getMessage());
+      throw ie;
+    }
+  }
+
+  /**
+   * Append to an existing file in the namespace.
+   */
+  LocatedBlock appendFile(String src, String holder, String clientMachine
+      ) throws IOException {
+    if (supportAppends == false) {
+      throw new IOException("Append to hdfs not supported." +
+                            " Please refer to dfs.support.append configuration parameter.");
+    }
+    startFileInternal(src, null, holder, clientMachine, false, true, 
+                      (short)maxReplication, (long)0);
+    getEditLog().logSync();
+
+    //
+    // Create a LocatedBlock object for the last block of the file
+    // to be returned to the client. Return null if the file does not
+    // have a partial block at the end.
+    //
+    LocatedBlock lb = null;
+    synchronized (this) {
+      INodeFileUnderConstruction file = (INodeFileUnderConstruction)dir.getFileINode(src);
+
+      Block[] blocks = file.getBlocks();
+      if (blocks != null && blocks.length > 0) {
+        Block last = blocks[blocks.length-1];
+        BlockInfo storedBlock = blocksMap.getStoredBlock(last);
+        if (file.getPreferredBlockSize() > storedBlock.getNumBytes()) {
+          long fileLength = file.computeContentSummary().getLength();
+          DatanodeDescriptor[] targets = new DatanodeDescriptor[blocksMap.numNodes(last)];
+          Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(last);
+          for (int i = 0; it != null && it.hasNext(); i++) {
+            targets[i] = it.next();
+          }
+          // remove the replica locations of this block from the blocksMap
+          for (int i = 0; i < targets.length; i++) {
+            targets[i].removeBlock(storedBlock);
+          }
+          // set the locations of the last block in the lease record
+          file.setLastBlock(storedBlock, targets);
+
+          lb = new LocatedBlock(last, targets, 
+                                fileLength-storedBlock.getNumBytes());
+          if (isAccessTokenEnabled) {
+            lb.setBlockToken(accessTokenHandler.generateToken(lb.getBlock(), 
+                EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
+          }
+
+          // Remove block from replication queue.
+          updateNeededReplications(last, 0, 0);
+
+          // remove this block from the list of pending blocks to be deleted. 
+          // This reduces the possibility of triggering HADOOP-1349.
+          //
+          for(Collection<Block> v : recentInvalidateSets.values()) {
+            if (v.remove(last)) {
+              pendingDeletionBlocksCount--;
+            }
+          }
+        }
+      }
+    }
+    if (lb != null) {
+      if (NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: file "
+            +src+" for "+holder+" at "+clientMachine
+            +" block " + lb.getBlock()
+            +" block size " + lb.getBlock().getNumBytes());
+      }
+    }
+
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      logAuditEvent(UserGroupInformation.getCurrentUser(),
+                    Server.getRemoteIp(),
+                    "append", src, null, null);
+    }
+    return lb;
+  }
+
+  /**
+   * The client would like to obtain an additional block for the indicated
+   * filename (which is being written-to).  Return an array that consists
+   * of the block, plus a set of machines.  The first on this list should
+   * be where the client writes data.  Subsequent items in the list must
+   * be provided in the connection to the first datanode.
+   *
+   * Make sure the previous blocks have been reported by datanodes and
+   * are replicated.  Will return an empty 2-elt array if we want the
+   * client to "try again later".
+   */
+  public LocatedBlock getAdditionalBlock(String src, 
+                                         String clientName
+                                         ) throws IOException {
+    long fileLength, blockSize;
+    int replication;
+    DatanodeDescriptor clientNode = null;
+    Block newBlock = null;
+
+    NameNode.stateChangeLog.debug("BLOCK* NameSystem.getAdditionalBlock: file "
+                                  +src+" for "+clientName);
+
+    synchronized (this) {
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot add block to " + src, safeMode);
+      }
+
+      // have we exceeded the configured limit of fs objects.
+      checkFsObjectLimit();
+
+      INodeFileUnderConstruction pendingFile  = checkLease(src, clientName);
+
+      //
+      // If we fail this, bad things happen!
+      //
+      if (!checkFileProgress(pendingFile, false)) {
+        throw new NotReplicatedYetException("Not replicated yet:" + src);
+      }
+      fileLength = pendingFile.computeContentSummary().getLength();
+      blockSize = pendingFile.getPreferredBlockSize();
+      clientNode = pendingFile.getClientNode();
+      replication = (int)pendingFile.getReplication();
+    }
+
+    // choose targets for the new block tobe allocated.
+    DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
+                                                           clientNode,
+                                                           null,
+                                                           blockSize);
+    if (targets.length < this.minReplication) {
+      throw new IOException("File " + src + " could only be replicated to " +
+                            targets.length + " nodes, instead of " +
+                            minReplication);
+    }
+
+    // Allocate a new block and record it in the INode. 
+    synchronized (this) {
+      INode[] pathINodes = dir.getExistingPathINodes(src);
+      int inodesLen = pathINodes.length;
+      checkLease(src, clientName, pathINodes[inodesLen-1]);
+      INodeFileUnderConstruction pendingFile  = (INodeFileUnderConstruction) 
+                                                pathINodes[inodesLen - 1];
+                                                           
+      if (!checkFileProgress(pendingFile, false)) {
+        throw new NotReplicatedYetException("Not replicated yet:" + src);
+      }
+
+      // allocate new block record block locations in INode.
+      newBlock = allocateBlock(src, pathINodes);
+      pendingFile.setTargets(targets);
+      
+      for (DatanodeDescriptor dn : targets) {
+        dn.incBlocksScheduled();
+      }      
+    }
+        
+    // Create next block
+    LocatedBlock b = new LocatedBlock(newBlock, targets, fileLength);
+    if (isAccessTokenEnabled) {
+      b.setBlockToken(accessTokenHandler.generateToken(b.getBlock(), 
+          EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
+    }
+    return b;
+  }
+
+  /**
+   * The client would like to let go of the given block
+   */
+  public synchronized boolean abandonBlock(Block b, String src, String holder
+      ) throws IOException {
+    //
+    // Remove the block from the pending creates list
+    //
+    NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
+                                  +b+"of file "+src);
+    INodeFileUnderConstruction file = checkLease(src, holder);
+    dir.removeBlock(src, file, b);
+    NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
+                                    + b
+                                    + " is removed from pendingCreates");
+    return true;
+  }
+  
+  // make sure that we still have the lease on this file.
+  private INodeFileUnderConstruction checkLease(String src, String holder) 
+                                                      throws IOException {
+    INodeFile file = dir.getFileINode(src);
+    checkLease(src, holder, file);
+    return (INodeFileUnderConstruction)file;
+  }
+
+  private void checkLease(String src, String holder, INode file) 
+                                                     throws IOException {
+
+    if (file == null || file.isDirectory()) {
+      Lease lease = leaseManager.getLease(holder);
+      throw new LeaseExpiredException("No lease on " + src +
+                                      " File does not exist. " +
+                                      (lease != null ? lease.toString() :
+                                       "Holder " + holder + 
+                                       " does not have any open files."));
+    }
+    if (!file.isUnderConstruction()) {
+      Lease lease = leaseManager.getLease(holder);
+      throw new LeaseExpiredException("No lease on " + src + 
+                                      " File is not open for writing. " +
+                                      (lease != null ? lease.toString() :
+                                       "Holder " + holder + 
+                                       " does not have any open files."));
+    }
+    INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)file;
+    if (holder != null && !pendingFile.getClientName().equals(holder)) {
+      throw new LeaseExpiredException("Lease mismatch on " + src + " owned by "
+          + pendingFile.getClientName() + " but is accessed by " + holder);
+    }
+  }
+
+  /**
+   * The FSNamesystem will already know the blocks that make up the file.
+   * Before we return, we make sure that all the file's blocks have 
+   * been reported by datanodes and are replicated correctly.
+   */
+  
+  enum CompleteFileStatus {
+    OPERATION_FAILED,
+    STILL_WAITING,
+    COMPLETE_SUCCESS
+  }
+  
+  public CompleteFileStatus completeFile(String src, String holder) throws IOException {
+    CompleteFileStatus status = completeFileInternal(src, holder);
+    getEditLog().logSync();
+    return status;
+  }
+
+
+  private synchronized CompleteFileStatus completeFileInternal(String src, 
+                                                String holder) throws IOException {
+    NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " + holder);
+    if (isInSafeMode())
+      throw new SafeModeException("Cannot complete file " + src, safeMode);
+    INode iFile = dir.getFileINode(src);
+    INodeFileUnderConstruction pendingFile = null;
+    Block[] fileBlocks = null;
+
+    if (iFile != null && iFile.isUnderConstruction()) {
+      pendingFile = (INodeFileUnderConstruction) iFile;
+      fileBlocks =  dir.getFileBlocks(src);
+    }
+    if (fileBlocks == null ) {    
+      NameNode.stateChangeLog.warn("DIR* NameSystem.completeFile: "
+                                   + "failed to complete " + src
+                                   + " because dir.getFileBlocks() is null " + 
+                                   " and pendingFile is " + 
+                                   ((pendingFile == null) ? "null" : 
+                                     ("from " + pendingFile.getClientMachine()))
+                                  );                      
+      return CompleteFileStatus.OPERATION_FAILED;
+    } else if (!checkFileProgress(pendingFile, true)) {
+      return CompleteFileStatus.STILL_WAITING;
+    }
+
+    finalizeINodeFileUnderConstruction(src, pendingFile);
+
+    NameNode.stateChangeLog.info("DIR* NameSystem.completeFile: file " + src
+                                  + " is closed by " + holder);
+    return CompleteFileStatus.COMPLETE_SUCCESS;
+  }
+
+  /** 
+   * Check all blocks of a file. If any blocks are lower than their intended
+   * replication factor, then insert them into neededReplication
+   */
+  private void checkReplicationFactor(INodeFile file) {
+    int numExpectedReplicas = file.getReplication();
+    Block[] pendingBlocks = file.getBlocks();
+    int nrBlocks = pendingBlocks.length;
+    for (int i = 0; i < nrBlocks; i++) {
+      // filter out containingNodes that are marked for decommission.
+      NumberReplicas number = countNodes(pendingBlocks[i]);
+      if (number.liveReplicas() < numExpectedReplicas) {
+        neededReplications.add(pendingBlocks[i], 
+                               number.liveReplicas(), 
+                               number.decommissionedReplicas,
+                               numExpectedReplicas);
+      }
+    }
+  }
+
+  static Random randBlockId = new Random();
+    
+  /**
+   * Allocate a block at the given pending filename
+   * 
+   * @param src path to the file
+   * @param inodes INode representing each of the components of src. 
+   *        <code>inodes[inodes.length-1]</code> is the INode for the file.
+   */
+  private Block allocateBlock(String src, INode[] inodes) throws IOException {
+    Block b = new Block(FSNamesystem.randBlockId.nextLong(), 0, 0); 
+    while(isValidBlock(b)) {
+      b.setBlockId(FSNamesystem.randBlockId.nextLong());
+    }
+    b.setGenerationStamp(getGenerationStamp());
+    b = dir.addBlock(src, inodes, b);
+    NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: "
+                                 +src+ ". "+b);
+    return b;
+  }
+
+  /**
+   * Check that the indicated file's blocks are present and
+   * replicated.  If not, return false. If checkall is true, then check
+   * all blocks, otherwise check only penultimate block.
+   */
+  synchronized boolean checkFileProgress(INodeFile v, boolean checkall) {
+    if (checkall) {
+      //
+      // check all blocks of the file.
+      //
+      for (Block block: v.getBlocks()) {
+        if (blocksMap.numNodes(block) < this.minReplication) {
+          return false;
+        }
+      }
+    } else {
+      //
+      // check the penultimate block of this file
+      //
+      Block b = v.getPenultimateBlock();
+      if (b != null) {
+        if (blocksMap.numNodes(b) < this.minReplication) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Remove a datanode from the invalidatesSet
+   * @param n datanode
+   */
+  void removeFromInvalidates(String storageID) {
+    Collection<Block> blocks = recentInvalidateSets.remove(storageID);
+    if (blocks != null) {
+      pendingDeletionBlocksCount -= blocks.size();
+    }
+  }
+
+  /**
+   * Adds block to list of blocks which will be invalidated on 
+   * specified datanode and log the move
+   * @param b block
+   * @param n datanode
+   */
+  void addToInvalidates(Block b, DatanodeInfo n) {
+    addToInvalidatesNoLog(b, n);
+    NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: "
+        + b.getBlockName() + " is added to invalidSet of " + n.getName());
+  }
+
+  /**
+   * Adds block to list of blocks which will be invalidated on 
+   * specified datanode
+   * @param b block
+   * @param n datanode
+   */
+  private void addToInvalidatesNoLog(Block b, DatanodeInfo n) {
+    Collection<Block> invalidateSet = recentInvalidateSets.get(n.getStorageID());
+    if (invalidateSet == null) {
+      invalidateSet = new HashSet<Block>();
+      recentInvalidateSets.put(n.getStorageID(), invalidateSet);
+    }
+    if (invalidateSet.add(b)) {
+      pendingDeletionBlocksCount++;
+    }
+  }
+  
+  /**
+   * Adds block to list of blocks which will be invalidated on 
+   * all its datanodes.
+   */
+  private void addToInvalidates(Block b) {
+    for (Iterator<DatanodeDescriptor> it = 
+                                blocksMap.nodeIterator(b); it.hasNext();) {
+      DatanodeDescriptor node = it.next();
+      addToInvalidates(b, node);
+    }
+  }
+
+  /**
+   * dumps the contents of recentInvalidateSets
+   */
+  private synchronized void dumpRecentInvalidateSets(PrintWriter out) {
+    int size = recentInvalidateSets.values().size();
+    out.println("Metasave: Blocks " + pendingDeletionBlocksCount 
+        + " waiting deletion from " + size + " datanodes.");
+    if (size == 0) {
+      return;
+    }
+    for(Map.Entry<String,Collection<Block>> entry : recentInvalidateSets.entrySet()) {
+      Collection<Block> blocks = entry.getValue();
+      if (blocks.size() > 0) {
+        out.println(datanodeMap.get(entry.getKey()).getName() + blocks);
+      }
+    }
+  }
+
+  /**
+   * Mark the block belonging to datanode as corrupt
+   * @param blk Block to be marked as corrupt
+   * @param dn Datanode which holds the corrupt replica
+   */
+  public synchronized void markBlockAsCorrupt(Block blk, DatanodeInfo dn)
+    throws IOException {
+    DatanodeDescriptor node = getDatanode(dn);
+    if (node == null) {
+      throw new IOException("Cannot mark block" + blk.getBlockName() +
+                            " as corrupt because datanode " + dn.getName() +
+                            " does not exist. ");
+    }
+    
+    final BlockInfo storedBlockInfo = blocksMap.getStoredBlock(blk);
+    if (storedBlockInfo == null) {
+      // Check if the replica is in the blockMap, if not 
+      // ignore the request for now. This could happen when BlockScanner
+      // thread of Datanode reports bad block before Block reports are sent
+      // by the Datanode on startup
+      NameNode.stateChangeLog.info("BLOCK NameSystem.markBlockAsCorrupt: " +
+                                   "block " + blk + " could not be marked " +
+                                   "as corrupt as it does not exists in " +
+                                   "blocksMap");
+    } else {
+      INodeFile inode = storedBlockInfo.getINode();
+      if (inode == null) {
+        NameNode.stateChangeLog.info("BLOCK NameSystem.markBlockAsCorrupt: " +
+                                     "block " + blk + " could not be marked " +
+                                     "as corrupt as it does not belong to " +
+                                     "any file");
+        addToInvalidates(storedBlockInfo, node);
+        return;
+      } 
+      // Add this replica to corruptReplicas Map 
+      corruptReplicas.addToCorruptReplicasMap(storedBlockInfo, node);
+      if (countNodes(storedBlockInfo).liveReplicas()>inode.getReplication()) {
+        // the block is over-replicated so invalidate the replicas immediately
+        invalidateBlock(storedBlockInfo, node);
+      } else {
+        // add the block to neededReplication 
+        updateNeededReplications(storedBlockInfo, -1, 0);
+      }
+    }
+  }
+
+  /**
+   * Invalidates the given block on the given datanode.
+   */
+  public synchronized void invalidateBlock(Block blk, DatanodeInfo dn)
+    throws IOException {
+    NameNode.stateChangeLog.info("DIR* NameSystem.invalidateBlock: " 
+                                 + blk + " on " 
+                                 + dn.getName());
+    DatanodeDescriptor node = getDatanode(dn);
+    if (node == null) {
+      throw new IOException("Cannot invalidate block " + blk +
+                            " because datanode " + dn.getName() +
+                            " does not exist.");
+    }
+
+    // Check how many copies we have of the block.  If we have at least one
+    // copy on a live node, then we can delete it. 
+    int count = countNodes(blk).liveReplicas();
+    if (count > 1) {
+      addToInvalidates(blk, dn);
+      removeStoredBlock(blk, node);
+      NameNode.stateChangeLog.debug("BLOCK* NameSystem.invalidateBlocks: "
+                                   + blk + " on " 
+                                   + dn.getName() + " listed for deletion.");
+    } else {
+      NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: "
+                                   + blk + " on " 
+                                   + dn.getName() + " is the only copy and was not deleted.");
+    }
+  }
+
+  ////////////////////////////////////////////////////////////////
+  // Here's how to handle block-copy failure during client write:
+  // -- As usual, the client's write should result in a streaming
+  // backup write to a k-machine sequence.
+  // -- If one of the backup machines fails, no worries.  Fail silently.
+  // -- Before client is allowed to close and finalize file, make sure
+  // that the blocks are backed up.  Namenode may have to issue specific backup
+  // commands to make up for earlier datanode failures.  Once all copies
+  // are made, edit namespace and return to client.
+  ////////////////////////////////////////////////////////////////
+
+  /** Change the indicated filename. */
+  public boolean renameTo(String src, String dst) throws IOException {
+    boolean status = renameToInternal(src, dst);
+    getEditLog().logSync();
+    if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
+      final HdfsFileStatus stat = dir.getFileInfo(dst);
+      logAuditEvent(UserGroupInformation.getCurrentUser(),
+                    Server.getRemoteIp(),
+                    "rename", src, dst, stat);
+    }
+    return status;
+  }
+
+  private synchronized boolean renameToInternal(String src, String dst
+      ) throws IOException {
+    NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src + " to " + dst);
+    if (isInSafeMode())
+      throw new SafeModeException("Cannot rename " + src, safeMode);
+    if (!DFSUtil.isValidName(dst)) {
+      throw new IOException("Invalid name: " + dst);
+    }
+
+    if (isPermissionEnabled) {
+      //We should not be doing this.  This is move() not renameTo().
+      //but for now,
+      String actualdst = dir.isDir(dst)?
+          dst + Path.SEPARATOR + new Path(src).getName(): dst;
+      checkParentAccess(src, FsAction.WRITE);
+      checkAncestorAccess(actualdst, FsAction.WRITE);
+    }
+
+    HdfsFileStatus dinfo = dir.getFileInfo(dst);
+    if (dir.renameTo(src, dst)) {
+      changeLease(src, dst, dinfo);     // update lease with new filename
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Remove the indicated filename from namespace. If the filename 
+   * is a directory (non empty) and recursive is set to false then throw exception.
+   */
+    public boolean delete(String src, boolean recursive) throws IOException {
+      if ((!recursive) && (!dir.isDirEmpty(src))) {
+        throw new IOException(src + " is non empty");
+      }
+      boolean status = deleteInternal(src, true);
+      getEditLog().logSync();
+      if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
+        logAuditEvent(UserGroupInformation.getCurrentUser(),
+                      Server.getRemoteIp(),
+                      "delete", src, null, null);
+      }
+      return status;
+    }
+    
+  /**
+   * Remove the indicated filename from the namespace.  This may
+   * invalidate some blocks that make up the file.
+   */
+  synchronized boolean deleteInternal(String src, 
+      boolean enforcePermission) throws IOException {
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
+    }
+    if (isInSafeMode())
+      throw new SafeModeException("Cannot delete " + src, safeMode);
+    if (enforcePermission && isPermissionEnabled) {
+      checkPermission(src, false, null, FsAction.WRITE, null, FsAction.ALL);
+    }
+
+    return dir.delete(src);
+  }
+
+  void removePathAndBlocks(String src, List<Block> blocks) throws IOException {
+    leaseManager.removeLeaseWithPrefixPath(src);
+    for(Block b : blocks) {
+      blocksMap.removeINode(b);
+      corruptReplicas.removeFromCorruptReplicasMap(b);
+      addToInvalidates(b);
+    }
+  }
+
+  /** Get the file info for a specific file.
+   * @param src The string representation of the path to the file
+   * @throws IOException if permission to access file is denied by the system 
+   * @return object containing information regarding the file
+   *         or null if file not found
+   */
+  HdfsFileStatus getFileInfo(String src) throws IOException {
+    if (isPermissionEnabled) {
+      checkTraverse(src);
+    }
+    return dir.getFileInfo(src);
+  }
+
+  /**
+   * Create all the necessary directories
+   */
+  public boolean mkdirs(String src, PermissionStatus permissions
+      ) throws IOException {
+    boolean status = mkdirsInternal(src, permissions);
+    getEditLog().logSync();
+    if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
+      final HdfsFileStatus stat = dir.getFileInfo(src);
+      logAuditEvent(UserGroupInformation.getCurrentUser(),
+                    Server.getRemoteIp(),
+                    "mkdirs", src, null, stat);
+    }
+    return status;
+  }
+    
+  /**
+   * Create all the necessary directories
+   */
+  private synchronized boolean mkdirsInternal(String src,
+      PermissionStatus permissions) throws IOException {
+    NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
+    if (isPermissionEnabled) {
+      checkTraverse(src);
+    }
+    if (dir.isDir(src)) {
+      // all the users of mkdirs() are used to expect 'true' even if
+      // a new directory is not created.
+      return true;
+    }
+    if (isInSafeMode())
+      throw new SafeModeException("Cannot create directory " + src, safeMode);
+    if (!DFSUtil.isValidName(src)) {
+      throw new IOException("Invalid directory name: " + src);
+    }
+    if (isPermissionEnabled) {
+      checkAncestorAccess(src, FsAction.WRITE);
+    }
+
+    // validate that we have enough inodes. This is, at best, a 
+    // heuristic because the mkdirs() operation migth need to 
+    // create multiple inodes.
+    checkFsObjectLimit();
+
+    if (!dir.mkdirs(src, permissions, false, now())) {
+      throw new IOException("Invalid directory name: " + src);
+    }
+    return true;
+  }
+
+  ContentSummary getContentSummary(String src) throws IOException {
+    if (isPermissionEnabled) {
+      checkPermission(src, false, null, null, null, FsAction.READ_EXECUTE);
+    }
+    return dir.getContentSummary(src);
+  }
+
+  /**
+   * Set the namespace quota and diskspace quota for a directory.
+   * See {@link ClientProtocol#setQuota(String, long, long)} for the 
+   * contract.
+   */
+  void setQuota(String path, long nsQuota, long dsQuota) throws IOException {
+   if (isInSafeMode())
+      throw new SafeModeException("Cannot set quota on " + path, safeMode); 
+   if (isPermissionEnabled) {
+      checkSuperuserPrivilege();
+    }
+    
+    dir.setQuota(path, nsQuota, dsQuota);
+    getEditLog().logSync();
+  }
+  
+  /** Persist all metadata about this file.
+   * @param src The string representation of the path
+   * @param clientName The string representation of the client
+   * @throws IOException if path does not exist
+   */
+  void fsync(String src, String clientName) throws IOException {
+
+    NameNode.stateChangeLog.info("BLOCK* NameSystem.fsync: file "
+                                  + src + " for " + clientName);
+    synchronized (this) {
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot fsync file " + src, safeMode);
+      }
+      INodeFileUnderConstruction pendingFile  = checkLease(src, clientName);
+      dir.persistBlocks(src, pendingFile);
+    }
+  }
+
+  /**
+   * Move a file that is being written to be immutable.
+   * @param src The filename
+   * @param lease The lease for the client creating the file
+   */
+  void internalReleaseLease(Lease lease, String src) throws IOException {
+    LOG.info("Recovering lease=" + lease + ", src=" + src);
+
+    INodeFile iFile = dir.getFileINode(src);
+    if (iFile == null) {
+      final String message = "DIR* NameSystem.internalReleaseCreate: "
+        + "attempt to release a create lock on "
+        + src + " file does not exist.";
+      NameNode.stateChangeLog.warn(message);
+      throw new IOException(message);
+    }
+    if (!iFile.isUnderConstruction()) {
+      final String message = "DIR* NameSystem.internalReleaseCreate: "
+        + "attempt to release a create lock on "
+        + src + " but file is already closed.";
+      NameNode.stateChangeLog.warn(message);
+      throw new IOException(message);
+    }
+
+    INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) iFile;
+
+    // Initialize lease recovery for pendingFile. If there are no blocks 
+    // associated with this file, then reap lease immediately. Otherwise 
+    // renew the lease and trigger lease recovery.
+    if (pendingFile.getTargets() == null ||
+        pendingFile.getTargets().length == 0) {
+      if (pendingFile.getBlocks().length == 0) {
+        finalizeINodeFileUnderConstruction(src, pendingFile);
+        NameNode.stateChangeLog.warn("BLOCK*"
+          + " internalReleaseLease: No blocks found, lease removed.");
+        return;
+      }
+      // setup the Inode.targets for the last block from the blocksMap
+      //
+      Block[] blocks = pendingFile.getBlocks();
+      Block last = blocks[blocks.length-1];
+      DatanodeDescriptor[] targets = 
+         new DatanodeDescriptor[blocksMap.numNodes(last)];
+      Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(last);
+      for (int i = 0; it != null && it.hasNext(); i++) {
+        targets[i] = it.next();
+      }
+      pendingFile.setTargets(targets);
+    }
+    // start lease recovery of the last block for this file.
+    pendingFile.assignPrimaryDatanode();
+    leaseManager.renewLease(lease);
+  }
+
+  private void finalizeINodeFileUnderConstruction(String src,
+      INodeFileUnderConstruction pendingFile) throws IOException {
+    leaseManager.removeLease(pendingFile.clientName, src);
+
+    // The file is no longer pending.
+    // Create permanent INode, update blockmap
+    INodeFile newFile = pendingFile.convertToInodeFile();
+    dir.replaceNode(src, pendingFile, newFile);
+
+    // close file and persist block allocations for this file
+    dir.closeFile(src, newFile);
+
+    checkReplicationFactor(newFile);
+  }
+
+  synchronized void commitBlockSynchronization(Block lastblock,
+      long newgenerationstamp, long newlength,
+      boolean closeFile, boolean deleteblock, DatanodeID[] newtargets
+      ) throws IOException {
+    LOG.info("commitBlockSynchronization(lastblock=" + lastblock
+          + ", newgenerationstamp=" + newgenerationstamp
+          + ", newlength=" + newlength
+          + ", newtargets=" + Arrays.asList(newtargets)
+          + ", closeFile=" + closeFile
+          + ", deleteBlock=" + deleteblock
+          + ")");
+    final BlockInfo oldblockinfo = blocksMap.getStoredBlock(lastblock);
+    if (oldblockinfo == null) {
+      throw new IOException("Block (=" + lastblock + ") not found");
+    }
+    INodeFile iFile = oldblockinfo.getINode();
+    if (!iFile.isUnderConstruction()) {
+      throw new IOException("Unexpected block (=" + lastblock
+          + ") since the file (=" + iFile.getLocalName()
+          + ") is not under construction");
+    }
+    INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
+
+
+    // Remove old block from blocks map. This always have to be done
+    // because the generation stamp of this block is changing.
+    blocksMap.removeBlock(oldblockinfo);
+
+    if (deleteblock) {
+      pendingFile.removeBlock(lastblock);
+    }
+    else {
+      // update last block, construct newblockinfo and add it to the blocks map
+      lastblock.set(lastblock.getBlockId(), newlength, newgenerationstamp);
+      final BlockInfo newblockinfo = blocksMap.addINode(lastblock, pendingFile);
+
+      // find the DatanodeDescriptor objects
+      // There should be no locations in the blocksMap till now because the
+      // file is underConstruction
+      DatanodeDescriptor[] descriptors = null;
+      if (newtargets.length > 0) {
+        descriptors = new DatanodeDescriptor[newtargets.length];
+        for(int i = 0; i < newtargets.length; i++) {
+          descriptors[i] = getDatanode(newtargets[i]);
+        }
+      }
+      if (closeFile) {
+        // the file is getting closed. Insert block locations into blocksMap.
+        // Otherwise fsck will report these blocks as MISSING, especially if the
+        // blocksReceived from Datanodes take a long time to arrive.
+        for (int i = 0; i < descriptors.length; i++) {
+          descriptors[i].addBlock(newblockinfo);
+        }
+        pendingFile.setLastBlock(newblockinfo, null);
+      } else {
+        // add locations into the INodeUnderConstruction
+        pendingFile.setLastBlock(newblockinfo, descriptors);
+      }
+    }
+
+    // If this commit does not want to close the file, persist
+    // blocks only if append is supported and return
+    String src = leaseManager.findPath(pendingFile);
+    if (!closeFile) {
+      if (supportAppends) {
+        dir.persistBlocks(src, pendingFile);
+        getEditLog().logSync();
+      }
+      LOG.info("commitBlockSynchronization(" + lastblock + ") successful");
+      return;
+    }
+    
+    //remove lease, close file
+    finalizeINodeFileUnderConstruction(src, pendingFile);
+    getEditLog().logSync();
+    LOG.info("commitBlockSynchronization(newblock=" + lastblock
+          + ", file=" + src
+          + ", newgenerationstamp=" + newgenerationstamp
+          + ", newlength=" + newlength
+          + ", newtargets=" + Arrays.asList(newtargets) + ") successful");
+  }
+
+
+  /**
+   * Renew the lease(s) held by the given client
+   */
+  void renewLease(String holder) throws IOException {
+    if (isInSafeMode())
+      throw new SafeModeException("Cannot renew lease for " + holder, safeMode);
+    leaseManager.renewLease(holder);
+  }
+
+  /**
+   * Get a partial listing of the indicated directory
+   * 
+   * @param src the directory name
+   * @param startAfter the name to start after
+   * @return a partial listing starting after startAfter 
+   */
+  public DirectoryListing getListing(String src, byte[] startAfter)
+  throws IOException {
+    if (isPermissionEnabled) {
+      if (dir.isDir(src)) {
+        checkPathAccess(src, FsAction.READ_EXECUTE);
+      }
+      else {
+        checkTraverse(src);
+      }
+    }
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      logAuditEvent(UserGroupInformation.getCurrentUser(),
+                    Server.getRemoteIp(),
+                    "listStatus", src, null, null);
+    }
+    return dir.getListing(src, startAfter);
+  }
+
+  /////////////////////////////////////////////////////////
+  //
+  // These methods are called by datanodes
+  //
+  /////////////////////////////////////////////////////////
+  /**
+   * Register Datanode.
+   * <p>
+   * The purpose of registration is to identify whether the new datanode
+   * serves a new data storage, and will report new data block copies,
+   * which the namenode was not aware of; or the datanode is a replacement
+   * node for the data storage that was previously served by a different
+   * or the same (in terms of host:port) datanode.
+   * The data storages are distinguished by their storageIDs. When a new
+   * data storage is reported the namenode issues a new unique storageID.
+   * <p>
+   * Finally, the namenode returns its namespaceID as the registrationID
+   * for the datanodes. 
+   * namespaceID is a persistent attribute of the name space.
+   * The registrationID is checked every time the datanode is communicating
+   * with the namenode. 
+   * Datanodes with inappropriate registrationID are rejected.
+   * If the namenode stops, and then restarts it can restore its 
+   * namespaceID and will continue serving the datanodes that has previously
+   * registered with the namenode without restarting the whole cluster.
+   * 
+   * @see org.apache.hadoop.hdfs.server.datanode.DataNode#register()
+   */
+  public synchronized void registerDatanode(DatanodeRegistration nodeReg
+                                            ) throws IOException {
+    String dnAddress = Server.getRemoteAddress();
+    if (dnAddress == null) {
+      // Mostly called inside an RPC.
+      // But if not, use address passed by the data-node.
+      dnAddress = nodeReg.getHost();
+    }      
+
+    // check if the datanode is allowed to be connect to the namenode
+    if (!verifyNodeRegistration(nodeReg, dnAddress)) {
+      throw new DisallowedDatanodeException(nodeReg);
+    }
+
+    String hostName = nodeReg.getHost();
+      
+    // update the datanode's name with ip:port
+    DatanodeID dnReg = new DatanodeID(dnAddress + ":" + nodeReg.getPort(),
+                                      nodeReg.getStorageID(),
+                                      nodeReg.getInfoPort(),
+                                      nodeReg.getIpcPort());
+    nodeReg.updateRegInfo(dnReg);
+    nodeReg.exportedKeys = getBlockKeys();
+      
+    NameNode.stateChangeLog.info(
+                                 "BLOCK* NameSystem.registerDatanode: "
+                                 + "node registration from " + nodeReg.getName()
+                                 + " storage " + nodeReg.getStorageID());
+
+    DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
+    DatanodeDescriptor nodeN = host2DataNodeMap.getDatanodeByName(nodeReg.getName());
+      
+    if (nodeN != null && nodeN != nodeS) {
+      NameNode.LOG.info("BLOCK* NameSystem.registerDatanode: "
+                        + "node from name: " + nodeN.getName());
+      // nodeN previously served a different data storage, 
+      // which is not served by anybody anymore.
+      removeDatanode(nodeN);
+      // physically remove node from datanodeMap
+      wipeDatanode(nodeN);
+      nodeN = null;
+    }
+
+    if (nodeS != null) {
+      if (nodeN == nodeS) {
+        // The same datanode has been just restarted to serve the same data 
+        // storage. We do not need to remove old data blocks, the delta will
+        // be calculated on the next block report from the datanode
+        NameNode.stateChangeLog.debug("BLOCK* NameSystem.registerDatanode: "
+                                      + "node restarted.");
+      } else {
+        // nodeS is found
+        /* The registering datanode is a replacement node for the existing 
+          data storage, which from now on will be served by a new node.
+          If this message repeats, both nodes might have same storageID 
+          by (insanely rare) random chance. User needs to restart one of the
+          nodes with its data cleared (or user can just remove the StorageID
+          value in "VERSION" file under the data directory of the datanode,
+          but this is might not work if VERSION file format has changed 
+       */        
+        NameNode.stateChangeLog.info( "BLOCK* NameSystem.registerDatanode: "
+                                      + "node " + nodeS.getName()
+                                      + " is replaced by " + nodeReg.getName() + 
+                                      " with the same storageID " +
+                                      nodeReg.getStorageID());
+      }
+      // update cluster map
+      clusterMap.remove(nodeS);
+      nodeS.updateRegInfo(nodeReg);
+      nodeS.setHostName(hostName);
+      
+      // resolve network location
+      resolveNetworkLocation(nodeS);
+      clusterMap.add(nodeS);
+        
+      // also treat the registration message as a heartbeat
+      synchronized(heartbeats) {
+        if( !heartbeats.contains(nodeS)) {
+          heartbeats.add(nodeS);
+          //update its timestamp
+          nodeS.updateHeartbeat(0L, 0L, 0L, 0);
+          nodeS.isAlive = true;

[... 2931 lines stripped ...]


Mime
View raw message