hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r510181 [2/3] - in /lucene/hadoop/trunk: ./ conf/ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/util/ src/test/org/apache/hadoop/dfs/
Date Wed, 21 Feb 2007 20:11:01 GMT
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java.orig
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java.orig?view=auto&rev=510181
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java.orig (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java.orig Wed Feb 21 12:11:00 2007
@@ -0,0 +1,3740 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import org.apache.commons.logging.*;
+
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.util.*;
+import org.apache.hadoop.mapred.StatusHttpServer;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.fs.Path;
+
+import java.io.*;
+import java.util.*;
+
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+/***************************************************
+ * FSNamesystem does the actual bookkeeping work for the
+ * DataNode.
+ *
+ * It tracks several important tables.
+ *
+ * 1)  valid fsname --> blocklist  (kept on disk, logged)
+ * 2)  Set of all valid blocks (inverted #1)
+ * 3)  block --> machinelist (kept in memory, rebuilt dynamically from reports)
+ * 4)  machine --> blocklist (inverted #2)
+ * 5)  LRU cache of updated-heartbeat machines
+ ***************************************************/
+class FSNamesystem implements FSConstants {
+    public static final Log LOG = LogFactory.getLog("org.apache.hadoop.fs.FSNamesystem");
+
+    //
+    // Stores the correct file name hierarchy
+    //
+    FSDirectory dir;
+
+    //
+    // Stores the block-->datanode(s) map.  Updated only in response
+    // to client-sent information.
+    // Mapping: Block -> TreeSet<DatanodeDescriptor>
+    //
+    Map<Block, List<DatanodeDescriptor>> blocksMap = 
+                              new HashMap<Block, List<DatanodeDescriptor>>();
+
+    /**
+     * Stores the datanode -> block map.  
+     * <p>
+     * Done by storing a set of {@link DatanodeDescriptor} objects, sorted by 
+     * storage id. In order to keep the storage map consistent it tracks 
+     * all storages ever registered with the namenode.
+     * A descriptor corresponding to a specific storage id can be
+     * <ul> 
+     * <li>added to the map if it is a new storage id;</li>
+     * <li>updated with a new datanode started as a replacement for the old one 
+     * with the same storage id; and </li>
+     * <li>removed if and only if an existing datanode is restarted to serve a
+     * different storage id.</li>
+     * </ul> <br>
+     * The list of the {@link DatanodeDescriptor}s in the map is checkpointed
+     * in the namespace image file. Only the {@link DatanodeInfo} part is 
+     * persistent, the list of blocks is restored from the datanode block
+     * reports. 
+     * <p>
+     * Mapping: StorageID -> DatanodeDescriptor
+     */
+    Map<String, DatanodeDescriptor> datanodeMap = 
+                                      new TreeMap<String, DatanodeDescriptor>();
+
+    //
+    // Keeps a Collection for every named machine containing
+    // blocks that have recently been invalidated and are thought to live
+    // on the machine in question.
+    // Mapping: StorageID -> ArrayList<Block>
+    //
+    private Map<String, Collection<Block>> recentInvalidateSets = 
+                                      new TreeMap<String, Collection<Block>>();
+
+    //
+    // Keeps a TreeSet for every named node.  Each treeset contains
+    // a list of the blocks that are "extra" at that location.  We'll
+    // eventually remove these extras.
+    // Mapping: StorageID -> TreeSet<Block>
+    //
+    private Map<String, Collection<Block>> excessReplicateMap = 
+                                      new TreeMap<String, Collection<Block>>();
+
+    //
+    // Keeps track of files that are being created, plus the
+    // blocks that make them up.
+    // Mapping: fileName -> FileUnderConstruction
+    //
+    Map<UTF8, FileUnderConstruction> pendingCreates = 
+                                  new TreeMap<UTF8, FileUnderConstruction>();
+
+    //
+    // Keeps track of the blocks that are part of those pending creates
+    // Set of: Block
+    //
+    Collection<Block> pendingCreateBlocks = new TreeSet<Block>();
+
+    //
+    // Stats on overall usage
+    //
+    long totalCapacity = 0, totalRemaining = 0;
+
+    // total number of connections per live datanode
+    int totalLoad = 0;
+
+
+    //
+    // For the HTTP browsing interface
+    //
+    StatusHttpServer infoServer;
+    int infoPort;
+    String infoBindAddress;
+    Date startTime;
+    
+    //
+    Random r = new Random();
+
+    /**
+     * Stores a set of DatanodeDescriptor objects.
+     * This is a subset of {@link #datanodeMap}, containing nodes that are 
+     * considered alive.
+     * The {@link HeartbeatMonitor} periodically checks for outdated entries,
+     * and removes them from the list.
+     */
+    ArrayList<DatanodeDescriptor> heartbeats = new ArrayList<DatanodeDescriptor>();
+
+    //
+    // Store set of Blocks that need to be replicated 1 or more times.
+    // We also store pending replication-orders.
+    // Set of: Block
+    //
+    private UnderReplicationBlocks neededReplications = new UnderReplicationBlocks();
+    private Collection<Block> pendingReplications = new TreeSet<Block>();
+
+    //
+    // Used for handling lock-leases
+    // Mapping: leaseHolder -> Lease
+    //
+    private Map<UTF8, Lease> leases = new TreeMap<UTF8, Lease>();
+    // Set of: Lease
+    private SortedSet<Lease> sortedLeases = new TreeSet<Lease>();
+
+    //
+    // Threaded object that checks to see if we have been
+    // getting heartbeats from all clients. 
+    //
+    Daemon hbthread = null;   // HeartbeatMonitor thread
+    Daemon lmthread = null;   // LeaseMonitor thread
+    Daemon smmthread = null;  // SafeModeMonitor thread
+    Daemon replthread = null;  // Replication thread
+    boolean fsRunning = true;
+    long systemStart = 0;
+
+    //  The maximum number of replicates we should allow for a single block
+    private int maxReplication;
+    //  How many outgoing replication streams a given node should have at one time
+    private int maxReplicationStreams;
+    // MIN_REPLICATION is how many copies we need in place or else we disallow the write
+    private int minReplication;
+    // Default replication
+    private int defaultReplication;
+    // heartbeatRecheckInterval is how often namenode checks for expired datanodes
+    private long heartbeatRecheckInterval;
+    // heartbeatExpireInterval is how long namenode waits for datanode to report
+    // heartbeat
+    private long heartbeatExpireInterval;
+    //replicationRecheckInterval is how often namenode checks for new replication work
+    private long replicationRecheckInterval;
+    static int replIndex = 0; // last datanode used for replication work
+    static int REPL_WORK_PER_ITERATION = 32; // max percent datanodes per iteration
+
+    public static FSNamesystem fsNamesystemObject;
+    private String localMachine;
+    private int port;
+    private SafeModeInfo safeMode;  // safe mode information
+    
+    // datanode networktoplogy
+    NetworkTopology clusterMap = new NetworkTopology();
+    // for block replicas placement
+    Replicator replicator = new Replicator();
+
+    /**
+     * dirs is a list oif directories where the filesystem directory state 
+     * is stored
+     */
+    public FSNamesystem(File[] dirs, 
+                        String hostname,
+                        int port,
+                        NameNode nn, Configuration conf) throws IOException {
+        fsNamesystemObject = this;
+        this.defaultReplication = conf.getInt("dfs.replication", 3);
+        this.maxReplication = conf.getInt("dfs.replication.max", 512);
+        this.minReplication = conf.getInt("dfs.replication.min", 1);
+        if( minReplication <= 0 )
+          throw new IOException(
+              "Unexpected configuration parameters: dfs.replication.min = " 
+              + minReplication
+              + " must be greater than 0" );
+        if( maxReplication >= (int)Short.MAX_VALUE )
+          throw new IOException(
+              "Unexpected configuration parameters: dfs.replication.max = " 
+              + maxReplication + " must be less than " + (Short.MAX_VALUE) );
+        if( maxReplication < minReplication )
+          throw new IOException(
+              "Unexpected configuration parameters: dfs.replication.min = " 
+              + minReplication
+              + " must be less than dfs.replication.max = " 
+              + maxReplication );
+        this.maxReplicationStreams = conf.getInt("dfs.max-repl-streams", 2);
+        long heartbeatInterval = conf.getLong("dfs.heartbeat.interval", 3) * 1000;
+        this.heartbeatRecheckInterval = 5 * 60 * 1000; // 5 minutes
+        this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval +
+            10 * heartbeatInterval;
+        this.replicationRecheckInterval = 3 * 1000; //  3 second
+
+        this.localMachine = hostname;
+        this.port = port;
+        this.dir = new FSDirectory(dirs);
+        this.dir.loadFSImage( conf );
+        this.safeMode = new SafeModeInfo( conf );
+        setBlockTotal();
+        this.hbthread = new Daemon(new HeartbeatMonitor());
+        this.lmthread = new Daemon(new LeaseMonitor());
+        this.replthread = new Daemon(new ReplicationMonitor());
+        hbthread.start();
+        lmthread.start();
+        replthread.start();
+        this.systemStart = now();
+        this.startTime = new Date(systemStart); 
+
+        this.infoPort = conf.getInt("dfs.info.port", 50070);
+        this.infoBindAddress = conf.get("dfs.info.bindAddress", "0.0.0.0");
+        this.infoServer = new StatusHttpServer("dfs",infoBindAddress, infoPort, false);
+        this.infoServer.setAttribute("name.system", this);
+        this.infoServer.setAttribute("name.node", nn);
+        this.infoServer.setAttribute("name.conf", conf);
+        this.infoServer.addServlet("fsck", "/fsck", FsckServlet.class);
+        this.infoServer.addServlet("getimage", "/getimage", GetImageServlet.class);
+        this.infoServer.start();
+    }
+
+    /**
+     * dirs is a list of directories where the filesystem directory state 
+     * is stored
+     */
+    FSNamesystem(FSImage fsImage) throws IOException {
+        fsNamesystemObject = this;
+        this.dir = new FSDirectory(fsImage);
+    }
+
+    /** Return the FSNamesystem object
+     * 
+     */
+    public static FSNamesystem getFSNamesystem() {
+        return fsNamesystemObject;
+    } 
+
+    /** Close down this filesystem manager.
+     * Causes heartbeat and lease daemons to stop; waits briefly for
+     * them to finish, but a short timeout returns control back to caller.
+     */
+    public void close() {
+      synchronized (this) {
+        fsRunning = false;
+      }
+        try {
+            infoServer.stop();
+            hbthread.join(3000);
+            replthread.join(3000);
+        } catch (InterruptedException ie) {
+        } finally {
+          // using finally to ensure we also wait for lease daemon
+          try {
+            lmthread.join(3000);
+          } catch (InterruptedException ie) {
+          } finally {
+              try {
+                dir.close();
+              } catch (IOException ex) {
+                  // do nothing
+              }
+          }
+        }
+    }
+    
+    /* get replication factor of a block */
+    private int getReplication( Block block ) {
+        FSDirectory.INode fileINode = dir.getFileByBlock(block);
+        if( fileINode == null ) { // block does not belong to any file
+            return 0;
+        } else {
+            return fileINode.getReplication();
+        }
+    }
+
+    /* Class for keeping track of under replication blocks
+     * Blocks have replication priority, with priority 0 indicating the highest
+     * Blocks have only one replicas has the highest
+     */
+    private class UnderReplicationBlocks {
+        private static final int LEVEL = 3;
+        TreeSet<Block>[] priorityQueues = new TreeSet[LEVEL];
+        
+        /* constructor */
+        UnderReplicationBlocks() {
+            for(int i=0; i<LEVEL; i++) {
+                priorityQueues[i] = new TreeSet<Block>();
+            }
+        }
+        
+        /* Return the total number of under replication blocks */
+        synchronized int size() {
+            int size = 0;
+            for( int i=0; i<LEVEL; i++ ) {
+                size += priorityQueues[i].size();
+            }
+            return size;
+        }
+        
+        /* Check if a block is in the neededReplication queue */
+        synchronized boolean contains(Block block) {
+            for(TreeSet<Block> set:priorityQueues) {
+                if(set.contains(block)) return true;
+            }
+            return false;
+        }
+        
+        /* Return the priority of a block
+        * @param block a under replication block
+        * @param curReplicas current number of replicas of the block
+        * @param expectedReplicas expected number of replicas of the block
+        */
+        private int getPriority(Block block, 
+                int curReplicas, int expectedReplicas) {
+            if (curReplicas>=expectedReplicas) {
+                return LEVEL; // no need to replicate
+            } else if(curReplicas==1) {
+                return 0; // highest priority
+            } else if(curReplicas*3<expectedReplicas) {
+                return 1;
+            } else {
+                return 2;
+            }
+        }
+        
+        /* add a block to a under replication queue according to its priority
+         * @param block a under replication block
+         * @param curReplicas current number of replicas of the block
+         * @param expectedReplicas expected number of replicas of the block
+         */
+        synchronized boolean add(
+            Block block, int curReplicas, int expectedReplicas) {
+            if(expectedReplicas <= curReplicas) {
+                return false;
+            }
+            int priLevel = getPriority(block, curReplicas, expectedReplicas);
+            if( priorityQueues[priLevel].add(block) ) {
+                NameNode.stateChangeLog.debug(
+                        "BLOCK* NameSystem.UnderReplicationBlock.add:"
+                      + block.getBlockName()
+                      + " has only "+curReplicas
+                      + " replicas and need " + expectedReplicas
+                      + " replicas so is added to neededReplications"
+                      + " at priority level " + priLevel );
+                return true;
+            }
+            return false;
+        }
+
+        /* add a block to a under replication queue */
+        synchronized boolean add(Block block) {
+            int curReplicas = countContainingNodes(blocksMap.get(block));
+            int expectedReplicas = getReplication(block);
+            return add(block, curReplicas, expectedReplicas);
+        }
+        
+        /* remove a block from a under replication queue */
+        synchronized boolean remove(Block block, 
+                int oldReplicas, int oldExpectedReplicas) {
+            if(oldExpectedReplicas <= oldReplicas) {
+                return false;
+            }
+            int priLevel = getPriority(block, oldReplicas, oldExpectedReplicas);
+            return remove(block, priLevel);
+        }
+        
+        /* remove a block from a under replication queue given a priority*/
+        private boolean remove(Block block, int priLevel ) {
+            if( priorityQueues[priLevel].remove(block) ) {
+                NameNode.stateChangeLog.debug(
+                     "BLOCK* NameSystem.UnderReplicationBlock.remove: "
+                   + "Removing block " + block.getBlockName()
+                   + " from priority queue "+ priLevel );
+                return true;
+            } else {
+                for(int i=0; i<LEVEL; i++) {
+                    if( i!=priLevel && priorityQueues[i].remove(block) ) {
+                        NameNode.stateChangeLog.debug(
+                             "BLOCK* NameSystem.UnderReplicationBlock.remove: "
+                           + "Removing block " + block.getBlockName()
+                           + " from priority queue "+ i );
+                        return true;
+                    }
+                }
+            }
+            return false;
+        }
+        
+        /* remove a block from a under replication queue */
+        synchronized boolean remove(Block block) {
+            int curReplicas = countContainingNodes(blocksMap.get(block));
+            int expectedReplicas = getReplication(block);
+            return remove(block, curReplicas, expectedReplicas);
+        }
+        
+        /* update the priority level of a block */
+        synchronized void update(Block block,
+                int curReplicasDelta, int expectedReplicasDelta) {
+            int curReplicas = countContainingNodes(blocksMap.get(block));
+            int curExpectedReplicas = getReplication(block);
+            int oldReplicas = curReplicas-curReplicasDelta;
+            int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
+            int curPri = getPriority(block, curReplicas, curExpectedReplicas);
+            int oldPri = getPriority(block, oldReplicas, oldExpectedReplicas);
+            NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " + 
+                               block +
+                               " curReplicas " + curReplicas +
+                               " curExpectedReplicas " + curExpectedReplicas +
+                               " oldReplicas " + oldReplicas +
+                               " oldExpectedReplicas  " + oldExpectedReplicas +
+                               " curPri  " + curPri +
+                               " oldPri  " + oldPri);
+            if( oldPri != LEVEL && oldPri != curPri ) {
+                remove(block, oldPri);
+            }
+            if( curPri != LEVEL && oldPri != curPri 
+                    && priorityQueues[curPri].add(block)) {
+                NameNode.stateChangeLog.debug(
+                        "BLOCK* NameSystem.UnderReplicationBlock.update:"
+                      + block.getBlockName()
+                      + " has only "+curReplicas
+                      + " replicas and need " + curExpectedReplicas
+                      + " replicas so is added to neededReplications"
+                      + " at priority level " + curPri );
+            }
+        }
+        
+        /* return a iterator of all the under replication blocks */
+        synchronized Iterator<Block> iterator() {
+            return new Iterator<Block>() {
+                int level;
+                Iterator<Block>[] iterator = new Iterator[LEVEL];
+                
+                {
+                    level=0;
+                    for(int i=0; i<LEVEL; i++) {
+                        iterator[i] = priorityQueues[i].iterator();
+                    }
+                }
+                
+                private void update() {
+                    while( level< LEVEL-1 && !iterator[level].hasNext()  ) {
+                        level++;
+                    }
+                }
+                
+                public Block next() {
+                    update();
+                    return iterator[level].next();
+                }
+                
+                public boolean hasNext() {
+                    update();
+                    return iterator[level].hasNext();
+                }
+                
+                public void remove() {
+                    iterator[level].remove();
+                }
+            };
+        }
+    }
+    
+    /////////////////////////////////////////////////////////
+    //
+    // These methods are called by HadoopFS clients
+    //
+    /////////////////////////////////////////////////////////
+    /**
+     * The client wants to open the given filename.  Return a
+     * list of (block,machineArray) pairs.  The sequence of unique blocks
+     * in the list indicates all the blocks that make up the filename.
+     *
+     * The client should choose one of the machines from the machineArray
+     * at random.
+     */
+    public Object[] open(String clientMachine, UTF8 src) {
+        Object results[] = null;
+        Block blocks[] = dir.getFile(src);
+        if (blocks != null) {
+            results = new Object[2];
+            DatanodeDescriptor machineSets[][] = new DatanodeDescriptor[blocks.length][];
+
+            for (int i = 0; i < blocks.length; i++) {
+                Collection<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
+                if (containingNodes == null) {
+                    machineSets[i] = new DatanodeDescriptor[0];
+                } else {
+                    machineSets[i] = new DatanodeDescriptor[containingNodes.size()];
+                    ArrayList<DatanodeDescriptor> containingNodesList =
+                      new ArrayList<DatanodeDescriptor>(containingNodes.size());
+                    containingNodesList.addAll(containingNodes);
+                    
+                    machineSets[i] = replicator.sortByDistance(
+                        getDatanodeByHost(clientMachine), containingNodesList);
+                }
+            }
+
+            results[0] = blocks;
+            results[1] = machineSets;
+        }
+        return results;
+    }
+
+    /**
+     * Set replication for an existing file.
+     * 
+     * The NameNode sets new replication and schedules either replication of 
+     * under-replicated data blocks or removal of the eccessive block copies 
+     * if the blocks are over-replicated.
+     * 
+     * @see ClientProtocol#setReplication(String, short)
+     * @param src file name
+     * @param replication new replication
+     * @return true if successful; 
+     *         false if file does not exist or is a directory
+     * @author shv
+     */
+    public synchronized boolean setReplication(String src, 
+                                               short replication
+                                              ) throws IOException {
+      if( isInSafeMode() )
+        throw new SafeModeException( "Cannot set replication for " + src, safeMode );
+      verifyReplication(src, replication, null );
+
+      Vector<Integer> oldReplication = new Vector<Integer>();
+      Block[] fileBlocks;
+      fileBlocks = dir.setReplication( src, replication, oldReplication );
+      if( fileBlocks == null )  // file not found or is a directory
+        return false;
+      int oldRepl = oldReplication.elementAt(0).intValue();
+      if( oldRepl == replication ) // the same replication
+        return true;
+
+      // update needReplication priority queues
+      LOG.info("Increasing replication for file " + src 
+              + ". New replication is " + replication );
+      for( int idx = 0; idx < fileBlocks.length; idx++ )
+          neededReplications.update( fileBlocks[idx], 0, replication-oldRepl );
+      
+      if( oldRepl > replication ) {  
+        // old replication > the new one; need to remove copies
+        LOG.info("Reducing replication for file " + src 
+                + ". New replication is " + replication );
+        for( int idx = 0; idx < fileBlocks.length; idx++ )
+          proccessOverReplicatedBlock( fileBlocks[idx], replication );
+      }
+      return true;
+    }
+    
+    public long getBlockSize(String filename) throws IOException {
+      return dir.getBlockSize(filename);
+    }
+    
+    /**
+     * Check whether the replication parameter is within the range
+     * determined by system configuration.
+     */
+    private void verifyReplication( String src, 
+                                    short replication, 
+                                    UTF8 clientName 
+                                  ) throws IOException {
+      String text = "file " + src 
+              + ((clientName != null) ? " on client " + clientName : "")
+              + ".\n"
+              + "Requested replication " + replication;
+
+      if( replication > maxReplication )
+        throw new IOException( text + " exceeds maximum " + maxReplication );
+      
+      if( replication < minReplication )
+        throw new IOException(  
+            text + " is less than the required minimum " + minReplication );
+    }
+    
+    /**
+     * The client would like to create a new block for the indicated
+     * filename.  Return an array that consists of the block, plus a set 
+     * of machines.  The first on this list should be where the client 
+     * writes data.  Subsequent items in the list must be provided in
+     * the connection to the first datanode.
+     * @return Return an array that consists of the block, plus a set
+     * of machines
+     * @throws IOException if the filename is invalid
+     *         {@link FSDirectory#isValidToCreate(UTF8)}.
+     */
+    public synchronized Object[] startFile( UTF8 src, 
+                                            UTF8 holder, 
+                                            UTF8 clientMachine, 
+                                            boolean overwrite,
+                                            short replication,
+                                            long blockSize
+                                          ) throws IOException {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: file "
+            +src+" for "+holder+" at "+clientMachine);
+      if( isInSafeMode() )
+        throw new SafeModeException( "Cannot create file" + src, safeMode );
+      if (!isValidName(src.toString())) {
+        throw new IOException("Invalid file name: " + src);      	  
+      }
+      try {
+        FileUnderConstruction pendingFile = pendingCreates.get(src);
+        if (pendingFile != null) {
+          //
+          // If the file exists in pendingCreate, then it must be in our
+          // leases. Find the appropriate lease record.
+          //
+          Lease lease = leases.get(holder);
+          //
+          // We found the lease for this file. And surprisingly the original
+          // holder is trying to recreate this file. This should never occur.
+          //
+          if (lease != null) {
+            throw new AlreadyBeingCreatedException(
+                  "failed to create file " + src + " for " + holder +
+                  " on client " + clientMachine + 
+                  " because current leaseholder is trying to recreate file.");
+          }
+          //
+          // Find the original holder.
+          //
+          UTF8 oldholder = pendingFile.getClientName();
+          lease = leases.get(oldholder);
+          if (lease == null) {
+            throw new AlreadyBeingCreatedException(
+                  "failed to create file " + src + " for " + holder +
+                  " on client " + clientMachine + 
+                  " because pendingCreates is non-null but no leases found.");
+          }
+          //
+          // If the original holder has not renewed in the last SOFTLIMIT 
+          // period, then reclaim all resources and allow this request 
+          // to proceed. Otherwise, prevent this request from creating file.
+          //
+          if (lease.expiredSoftLimit()) {
+            lease.releaseLocks();
+            leases.remove(lease.holder);
+            LOG.info("Removing lease " + lease + " ");
+            if (!sortedLeases.remove(lease)) {
+              LOG.error("Unknown failure trying to remove " + lease + 
+                       " from lease set.");
+            }
+          } else  {
+            throw new AlreadyBeingCreatedException(
+                  "failed to create file " + src + " for " + holder +
+                  " on client " + clientMachine + 
+                  " because pendingCreates is non-null.");
+          }
+        }
+
+        try {
+           verifyReplication(src.toString(), replication, clientMachine );
+        } catch( IOException e) {
+            throw new IOException( "failed to create "+e.getMessage());
+        }
+        if (!dir.isValidToCreate(src)) {
+          if (overwrite) {
+            delete(src);
+          } else {
+            throw new IOException("failed to create file " + src 
+                    +" on client " + clientMachine
+                    +" either because the filename is invalid or the file exists");
+          }
+        }
+
+        // Get the array of replication targets
+        DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
+            getDatanodeByHost(clientMachine.toString()), null, blockSize);
+        if (targets.length < this.minReplication) {
+            throw new IOException("failed to create file "+src
+                    +" on client " + clientMachine
+                    +" because target-length is " + targets.length 
+                    +", below MIN_REPLICATION (" + minReplication+ ")");
+       }
+
+        // Reserve space for this pending file
+        pendingCreates.put(src, 
+                           new FileUnderConstruction(replication, 
+                                                     blockSize,
+                                                     holder,
+                                                     clientMachine));
+        NameNode.stateChangeLog.debug( "DIR* NameSystem.startFile: "
+                   +"add "+src+" to pendingCreates for "+holder );
+        synchronized (leases) {
+            Lease lease = leases.get(holder);
+            if (lease == null) {
+                lease = new Lease(holder);
+                leases.put(holder, lease);
+                sortedLeases.add(lease);
+            } else {
+                sortedLeases.remove(lease);
+                lease.renew();
+                sortedLeases.add(lease);
+            }
+            lease.startedCreate(src);
+        }
+
+        // Create next block
+        Object results[] = new Object[2];
+        results[0] = allocateBlock(src);
+        results[1] = targets;
+        return results;
+      } catch (IOException ie) {
+          NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: "
+                  +ie.getMessage());
+        throw ie;
+      }
+    }
+
+    /**
+     * The client would like to obtain an additional block for the indicated
+     * filename (which is being written-to).  Return an array that consists
+     * of the block, plus a set of machines.  The first on this list should
+     * be where the client writes data.  Subsequent items in the list must
+     * be provided in the connection to the first datanode.
+     *
+     * Make sure the previous blocks have been reported by datanodes and
+     * are replicated.  Will return an empty 2-elt array if we want the
+     * client to "try again later".
+     */
+    public synchronized Object[] getAdditionalBlock(UTF8 src, 
+                                                    UTF8 clientName
+                                                    ) throws IOException {
+        NameNode.stateChangeLog.debug("BLOCK* NameSystem.getAdditionalBlock: file "
+            +src+" for "+clientName);
+        if( isInSafeMode() )
+          throw new SafeModeException( "Cannot add block to " + src, safeMode );
+        FileUnderConstruction pendingFile = pendingCreates.get(src);
+        // make sure that we still have the lease on this file
+        if (pendingFile == null) {
+          throw new LeaseExpiredException("No lease on " + src);
+        }
+        if (!pendingFile.getClientName().equals(clientName)) {
+          throw new LeaseExpiredException("Lease mismatch on " + src + 
+              " owned by " + pendingFile.getClientName() + 
+              " and appended by " + clientName);
+        }
+        if (dir.getFile(src) != null) {
+          throw new IOException("File " + src + " created during write");
+        }
+
+        //
+        // If we fail this, bad things happen!
+        //
+        if (!checkFileProgress(src)) {
+          throw new NotReplicatedYetException("Not replicated yet");
+        }
+        
+        // Get the array of replication targets
+        String clientHost = pendingFile.getClientMachine().toString();
+        DatanodeDescriptor targets[] = replicator.chooseTarget(
+            (int)(pendingFile.getReplication()),
+            getDatanodeByHost(clientHost),
+            null,
+            pendingFile.getBlockSize());
+        if (targets.length < this.minReplication) {
+          throw new IOException("File " + src + " could only be replicated to " +
+                                targets.length + " nodes, instead of " +
+                                minReplication);
+        }
+        
+        // Create next block
+        return new Object[]{allocateBlock(src), targets};
+    }
+
+    /**
+     * The client would like to let go of the given block
+     */
+    public synchronized boolean abandonBlock(Block b, UTF8 src) {
+        //
+        // Remove the block from the pending creates list
+        //
+        NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
+                +b.getBlockName()+"of file "+src );
+        FileUnderConstruction pendingFile = pendingCreates.get(src);
+        if (pendingFile != null) {
+            Collection<Block> pendingVector = pendingFile.getBlocks();
+            for (Iterator<Block> it = pendingVector.iterator(); it.hasNext(); ) {
+                Block cur = it.next();
+                if (cur.compareTo(b) == 0) {
+                    pendingCreateBlocks.remove(cur);
+                    it.remove();
+                    NameNode.stateChangeLog.debug(
+                             "BLOCK* NameSystem.abandonBlock: "
+                            +b.getBlockName()
+                            +" is removed from pendingCreateBlock and pendingCreates");
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Abandon the entire file in progress
+     */
+    public synchronized void abandonFileInProgress(UTF8 src, 
+                                                   UTF8 holder
+                                                   ) throws IOException {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.abandonFileInProgress:" + src );
+      synchronized (leases) {
+        // find the lease
+        Lease lease = leases.get(holder);
+        if (lease != null) {
+          // remove the file from the lease
+          if (lease.completedCreate(src)) {
+            // if we found the file in the lease, remove it from pendingCreates
+            internalReleaseCreate(src, holder);
+          } else {
+            LOG.info("Attempt by " + holder.toString() + 
+                " to release someone else's create lock on " + 
+                src.toString());
+          }
+        } else {
+          LOG.info("Attempt to release a lock from an unknown lease holder "
+              + holder.toString() + " for " + src.toString());
+        }
+      }
+    }
+
+    /**
+     * Finalize the created file and make it world-accessible.  The
+     * FSNamesystem will already know the blocks that make up the file.
+     * Before we return, we make sure that all the file's blocks have 
+     * been reported by datanodes and are replicated correctly.
+     */
+    public synchronized int completeFile( UTF8 src, 
+                                          UTF8 holder) throws IOException {
+        NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " + holder );
+        if( isInSafeMode() )
+          throw new SafeModeException( "Cannot complete file " + src, safeMode );
+        if (dir.getFile(src) != null || pendingCreates.get(src) == null) {
+            NameNode.stateChangeLog.warn( "DIR* NameSystem.completeFile: "
+                    + "failed to complete " + src
+                    + " because dir.getFile()==" + dir.getFile(src) 
+                    + " and " + pendingCreates.get(src));
+            return OPERATION_FAILED;
+        } else if (! checkFileProgress(src)) {
+            return STILL_WAITING;
+        }
+        
+        FileUnderConstruction pendingFile = pendingCreates.get(src);
+        Collection<Block> blocks = pendingFile.getBlocks();
+        int nrBlocks = blocks.size();
+        Block pendingBlocks[] = blocks.toArray(new Block[nrBlocks]);
+
+        //
+        // We have the pending blocks, but they won't have
+        // length info in them (as they were allocated before
+        // data-write took place). Find the block stored in
+        // node descriptor.
+        //
+        for (int i = 0; i < nrBlocks; i++) {
+            Block b = pendingBlocks[i];
+            List<DatanodeDescriptor> containingNodes = blocksMap.get(b);
+            Block storedBlock = 
+                containingNodes.get(0).getBlock(b);
+            if ( storedBlock != null ) {
+                pendingBlocks[i] = storedBlock;
+            }
+        }
+        
+        //
+        // Now we can add the (name,blocks) tuple to the filesystem
+        //
+        if ( ! dir.addFile(src, pendingBlocks, pendingFile.getReplication())) {
+          return OPERATION_FAILED;
+        }
+
+        // The file is no longer pending
+        pendingCreates.remove(src);
+        NameNode.stateChangeLog.debug(
+             "DIR* NameSystem.completeFile: " + src
+           + " is removed from pendingCreates");
+        for (int i = 0; i < nrBlocks; i++) {
+            pendingCreateBlocks.remove(pendingBlocks[i]);
+        }
+
+        synchronized (leases) {
+            Lease lease = leases.get(holder);
+            if (lease != null) {
+                lease.completedCreate(src);
+                if (! lease.hasLocks()) {
+                    leases.remove(holder);
+                    sortedLeases.remove(lease);
+                }
+            }
+        }
+
+        //
+        // REMIND - mjc - this should be done only after we wait a few secs.
+        // The namenode isn't giving datanodes enough time to report the
+        // replicated blocks that are automatically done as part of a client
+        // write.
+        //
+
+        // Now that the file is real, we need to be sure to replicate
+        // the blocks.
+        int numExpectedReplicas = pendingFile.getReplication();
+        for (int i = 0; i < nrBlocks; i++) {
+          Collection<DatanodeDescriptor> containingNodes = blocksMap.get(pendingBlocks[i]);
+          // filter out containingNodes that are marked for decommission.
+          int numCurrentReplica = countContainingNodes(containingNodes);
+
+            if (numCurrentReplica < numExpectedReplicas) {
+                neededReplications.add(
+                      pendingBlocks[i], numCurrentReplica, numExpectedReplicas);
+            }
+        }
+        return COMPLETE_SUCCESS;
+    }
+
+    static Random randBlockId = new Random();
+    
+    /**
+     * Allocate a block at the given pending filename
+     */
+    synchronized Block allocateBlock(UTF8 src) {
+        Block b = null;
+        do {
+            b = new Block(FSNamesystem.randBlockId.nextLong(), 0);
+        } while (dir.isValidBlock(b));
+        FileUnderConstruction v = pendingCreates.get(src);
+        v.getBlocks().add(b);
+        pendingCreateBlocks.add(b);
+        NameNode.stateChangeLog.debug("BLOCK* NameSystem.allocateBlock: "
+            +src+ ". "+b.getBlockName()+
+            " is created and added to pendingCreates and pendingCreateBlocks" );      
+        return b;
+    }
+
+    /**
+     * Check that the indicated file's blocks are present and
+     * replicated.  If not, return false.
+     */
+    synchronized boolean checkFileProgress(UTF8 src) {
+        FileUnderConstruction v = pendingCreates.get(src);
+
+        for (Iterator<Block> it = v.getBlocks().iterator(); it.hasNext(); ) {
+            Block b = it.next();
+            Collection<DatanodeDescriptor> containingNodes = blocksMap.get(b);
+            if (containingNodes == null || containingNodes.size() < this.minReplication) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * Adds block to list of blocks which will be invalidated on 
+     * specified datanode.
+     */
+    private void addToInvalidates(Block b, DatanodeInfo n) {
+      Collection<Block> invalidateSet = recentInvalidateSets.get(n.getStorageID());
+      if (invalidateSet == null) {
+        invalidateSet = new ArrayList<Block>();
+        recentInvalidateSets.put(n.getStorageID(), invalidateSet);
+      }
+      invalidateSet.add(b);
+    }
+
+    /**
+     * Invalidates the given block on the given datanode.
+     */
+    public synchronized void invalidateBlock(Block blk, DatanodeInfo dn)
+        throws IOException {
+      NameNode.stateChangeLog.info("DIR* NameSystem.invalidateBlock: " 
+                                    + blk.getBlockName() + " on " 
+                                    + dn.getName());
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot invalidate block " + blk.getBlockName(), safeMode);
+      }
+
+      Collection<DatanodeDescriptor> containingNodes = blocksMap.get(blk);
+
+      // Check how many copies we have of the block.  If we have at least one
+      // copy on a live node, then we can delete it. 
+      if (containingNodes != null ) {
+        if ((countContainingNodes(containingNodes) > 1) || 
+            ((countContainingNodes(containingNodes) == 1) &&
+             (dn.isDecommissionInProgress() || dn.isDecommissioned()))) {
+          addToInvalidates(blk, dn);
+          removeStoredBlock(blk, getDatanode(dn));
+          NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: "
+                                        + blk.getBlockName() + " on " 
+                                        + dn.getName() + " listed for deletion.");
+        } else {
+          NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: "
+                                        + blk.getBlockName() + " on " 
+                                        + dn.getName() + " is the only copy and was not deleted.");
+        }
+      }
+    }
+
+    ////////////////////////////////////////////////////////////////
+    // Here's how to handle block-copy failure during client write:
+    // -- As usual, the client's write should result in a streaming
+    // backup write to a k-machine sequence.
+    // -- If one of the backup machines fails, no worries.  Fail silently.
+    // -- Before client is allowed to close and finalize file, make sure
+    // that the blocks are backed up.  Namenode may have to issue specific backup
+    // commands to make up for earlier datanode failures.  Once all copies
+    // are made, edit namespace and return to client.
+    ////////////////////////////////////////////////////////////////
+
+    /**
+     * Change the indicated filename.
+     */
+    public synchronized boolean renameTo(UTF8 src, UTF8 dst) throws IOException {
+        NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src + " to " + dst );
+        if( isInSafeMode() )
+          throw new SafeModeException( "Cannot rename " + src, safeMode );
+        if (!isValidName(dst.toString())) {
+          throw new IOException("Invalid name: " + dst);
+        }
+        return dir.renameTo(src, dst);
+    }
+
+    /**
+     * Remove the indicated filename from the namespace.  This may
+     * invalidate some blocks that make up the file.
+     */
+    public synchronized boolean delete(UTF8 src) throws IOException {
+        NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src );
+        if( isInSafeMode() )
+          throw new SafeModeException( "Cannot delete " + src, safeMode );
+        Block deletedBlocks[] = dir.delete(src);
+        if (deletedBlocks != null) {
+            for (int i = 0; i < deletedBlocks.length; i++) {
+                Block b = deletedBlocks[i];
+
+                Collection<DatanodeDescriptor> containingNodes = blocksMap.get(b);
+                if (containingNodes != null) {
+                    for (Iterator<DatanodeDescriptor> it = containingNodes.iterator(); it.hasNext(); ) {
+                        DatanodeDescriptor node = it.next();
+                        addToInvalidates(b, node);
+                        NameNode.stateChangeLog.debug("BLOCK* NameSystem.delete: "
+                            + b.getBlockName() + " is added to invalidSet of " + node.getName() );
+                    }
+                }
+            }
+        }
+
+        return (deletedBlocks != null);
+    }
+
+    /**
+     * Return whether the given filename exists
+     */
+    public boolean exists(UTF8 src) {
+        if (dir.getFile(src) != null || dir.isDir(src)) {
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    /**
+     * Whether the given name is a directory
+     */
+    public boolean isDir(UTF8 src) {
+        return dir.isDir(src);
+    }
+
+    /**
+     * Whether the pathname is valid.  Currently prohibits relative paths, 
+     * and names which contain a ":" or "/" 
+     */
+    static boolean isValidName(String src) {
+      
+      // Path must be absolute.
+      if (!src.startsWith(Path.SEPARATOR)) {
+        return false;
+      }
+      
+      // Check for ".." "." ":" "/"
+      StringTokenizer tokens = new StringTokenizer(src, Path.SEPARATOR);
+      while( tokens.hasMoreTokens()) {
+        String element = tokens.nextToken();
+        if (element.equals("..") || 
+            element.equals(".")  ||
+            (element.indexOf(":") >= 0)  ||
+            (element.indexOf("/") >= 0)) {
+          return false;
+        }
+      }
+      return true;
+    }
+    
+    /**
+     * Create all the necessary directories
+     */
+    public synchronized boolean mkdirs( String src ) throws IOException {
+        boolean    success;
+        NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src );
+        if( isInSafeMode() )
+          throw new SafeModeException( "Cannot create directory " + src, safeMode );
+        if (!isValidName(src)) {
+          throw new IOException("Invalid directory name: " + src);
+        }
+        success = dir.mkdirs(src);
+        if (!success) {
+          throw new IOException("Invalid directory name: " + src);
+        }
+        return success;
+    }
+
+    /**
+     * Figure out a few hosts that are likely to contain the
+     * block(s) referred to by the given (filename, start, len) tuple.
+     */
+    public String[][] getDatanodeHints(String src, long start, long len) {
+        if (start < 0 || len < 0) {
+            return new String[0][];
+        }
+
+        int startBlock = -1;
+        int endBlock = -1;
+        Block blocks[] = dir.getFile( new UTF8( src ));
+
+        if (blocks == null) {                     // no blocks
+            return new String[0][];
+        }
+
+        //
+        // First, figure out where the range falls in
+        // the blocklist.
+        //
+        long startpos = start;
+        long endpos = start + len;
+        for (int i = 0; i < blocks.length; i++) {
+            if (startpos >= 0) {
+                startpos -= blocks[i].getNumBytes();
+                if (startpos <= 0) {
+                    startBlock = i;
+                }
+            }
+            if (endpos >= 0) {
+                endpos -= blocks[i].getNumBytes();
+                if (endpos <= 0) {
+                    endBlock = i;
+                    break;
+                }
+            }
+        }
+
+        //
+        // Next, create an array of hosts where each block can
+        // be found
+        //
+        if (startBlock < 0 || endBlock < 0) {
+            return new String[0][];
+        } else {
+          String hosts[][] = new String[(endBlock - startBlock) + 1][];
+            for (int i = startBlock; i <= endBlock; i++) {
+                Collection<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]);
+                Collection<String> v = new ArrayList<String>();
+                if (containingNodes != null) {
+                  for (Iterator<DatanodeDescriptor> it =containingNodes.iterator(); it.hasNext();) {
+                    v.add( it.next().getHost() );
+                  }
+                }
+                hosts[i-startBlock] = v.toArray(new String[v.size()]);
+            }
+            return hosts;
+        }
+    }
+
+    /************************************************************
+     * A Lease governs all the locks held by a single client.
+     * For each client there's a corresponding lease, whose
+     * timestamp is updated when the client periodically
+     * checks in.  If the client dies and allows its lease to
+     * expire, all the corresponding locks can be released.
+     *************************************************************/
+    class Lease implements Comparable<Lease> {
+        public UTF8 holder;
+        public long lastUpdate;
+        private Collection<UTF8> locks = new TreeSet<UTF8>();
+        private Collection<UTF8> creates = new TreeSet<UTF8>();
+
+        public Lease(UTF8 holder) {
+            this.holder = holder;
+            renew();
+        }
+        public void renew() {
+            this.lastUpdate = now();
+        }
+        /**
+         * Returns true if the Hard Limit Timer has expired
+         */
+        public boolean expiredHardLimit() {
+            if (now() - lastUpdate > LEASE_HARDLIMIT_PERIOD) {
+                return true;
+            }
+            return false;
+        }
+        /**
+         * Returns true if the Soft Limit Timer has expired
+         */
+        public boolean expiredSoftLimit() {
+            if (now() - lastUpdate > LEASE_SOFTLIMIT_PERIOD) {
+                return true;
+            }
+            return false;
+        }
+        public void obtained(UTF8 src) {
+            locks.add(src);
+        }
+        public void released(UTF8 src) {
+            locks.remove(src);
+        }
+        public void startedCreate(UTF8 src) {
+            creates.add(src);
+        }
+        public boolean completedCreate(UTF8 src) {
+            return creates.remove(src);
+        }
+        public boolean hasLocks() {
+            return (locks.size() + creates.size()) > 0;
+        }
+        public void releaseLocks() {
+            for (Iterator<UTF8> it = locks.iterator(); it.hasNext(); )
+                internalReleaseLock(it.next(), holder);
+            locks.clear();
+            for (Iterator<UTF8> it = creates.iterator(); it.hasNext(); )
+                internalReleaseCreate(it.next(), holder);
+            creates.clear();
+        }
+
+        /**
+         */
+        public String toString() {
+            return "[Lease.  Holder: " + holder.toString() + ", heldlocks: " +
+                   locks.size() + ", pendingcreates: " + creates.size() + "]";
+        }
+
+        /**
+         */
+        public int compareTo(Lease o) {
+            Lease l1 = this;
+            Lease l2 = o;
+            long lu1 = l1.lastUpdate;
+            long lu2 = l2.lastUpdate;
+            if (lu1 < lu2) {
+                return -1;
+            } else if (lu1 > lu2) {
+                return 1;
+            } else {
+                return l1.holder.compareTo(l2.holder);
+            }
+        }
+    }
+    /******************************************************
+     * LeaseMonitor checks for leases that have expired,
+     * and disposes of them.
+     ******************************************************/
+    class LeaseMonitor implements Runnable {
+        public void run() {
+            while (fsRunning) {
+                synchronized (FSNamesystem.this) {
+                    synchronized (leases) {
+                        Lease top;
+                        while ((sortedLeases.size() > 0) &&
+                               ((top = sortedLeases.first()) != null)) {
+                            if (top.expiredHardLimit()) {
+                                top.releaseLocks();
+                                leases.remove(top.holder);
+                                LOG.info("Removing lease " + top + ", leases remaining: " + sortedLeases.size());
+                                if (!sortedLeases.remove(top)) {
+                                    LOG.info("Unknown failure trying to remove " + top + " from lease set.");
+                                }
+                            } else {
+                                break;
+                            }
+                        }
+                    }
+                }
+                try {
+                    Thread.sleep(2000);
+                } catch (InterruptedException ie) {
+                }
+            }
+        }
+    }
+
+    /**
+     * Get a lock (perhaps exclusive) on the given file
+     */
+    /** @deprecated */ @Deprecated
+    public synchronized int obtainLock( UTF8 src, 
+                                        UTF8 holder, 
+                                        boolean exclusive) throws IOException {
+        if( isInSafeMode() )
+          throw new SafeModeException( "Cannot lock file " + src, safeMode );
+        int result = dir.obtainLock(src, holder, exclusive);
+        if (result == COMPLETE_SUCCESS) {
+            synchronized (leases) {
+                Lease lease = leases.get(holder);
+                if (lease == null) {
+                    lease = new Lease(holder);
+                    leases.put(holder, lease);
+                    sortedLeases.add(lease);
+                } else {
+                    sortedLeases.remove(lease);
+                    lease.renew();
+                    sortedLeases.add(lease);
+                }
+                lease.obtained(src);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Release the lock on the given file
+     */
+    /** @deprecated */ @Deprecated
+    public synchronized int releaseLock(UTF8 src, UTF8 holder) {
+        int result = internalReleaseLock(src, holder);
+        if (result == COMPLETE_SUCCESS) {
+            synchronized (leases) {
+                Lease lease = leases.get(holder);
+                if (lease != null) {
+                    lease.released(src);
+                    if (! lease.hasLocks()) {
+                        leases.remove(holder);
+                        sortedLeases.remove(lease);
+                    }
+                }
+            }
+        }
+        return result;
+    }
+    private int internalReleaseLock(UTF8 src, UTF8 holder) {
+        return dir.releaseLock(src, holder);
+    }
+
+    /**
+     * Release a pending file creation lock.
+     * @param src The filename
+     * @param holder The datanode that was creating the file
+     */
+    private void internalReleaseCreate(UTF8 src, UTF8 holder) {
+      FileUnderConstruction v = pendingCreates.remove(src);
+      if (v != null) {
+         NameNode.stateChangeLog.debug(
+                      "DIR* NameSystem.internalReleaseCreate: " + src
+                    + " is removed from pendingCreates for "
+                    + holder + " (failure)");
+        for (Iterator<Block> it2 = v.getBlocks().iterator(); it2.hasNext(); ) {
+          Block b = it2.next();
+          pendingCreateBlocks.remove(b);
+        }
+      } else {
+          NameNode.stateChangeLog.warn("DIR* NameSystem.internalReleaseCreate: "
+                 + "attempt to release a create lock on "+ src.toString()
+                 + " that was not in pedingCreates");
+      }
+    }
+
+    /**
+     * Renew the lease(s) held by the given client
+     */
+    public void renewLease(UTF8 holder) throws IOException {
+        synchronized (leases) {
+            if( isInSafeMode() )
+              throw new SafeModeException( "Cannot renew lease for " + holder, safeMode );
+            Lease lease = leases.get(holder);
+            if (lease != null) {
+                sortedLeases.remove(lease);
+                lease.renew();
+                sortedLeases.add(lease);
+            }
+        }
+    }
+
+    /**
+     * Get a listing of all files at 'src'.  The Object[] array
+     * exists so we can return file attributes (soon to be implemented)
+     */
+    public DFSFileInfo[] getListing(UTF8 src) {
+        return dir.getListing(src);
+    }
+
+    /////////////////////////////////////////////////////////
+    //
+    // These methods are called by datanodes
+    //
+    /////////////////////////////////////////////////////////
+    /**
+     * Register Datanode.
+     * <p>
+     * The purpose of registration is to identify whether the new datanode
+     * serves a new data storage, and will report new data block copies,
+     * which the namenode was not aware of; or the datanode is a replacement
+     * node for the data storage that was previously served by a different
+     * or the same (in terms of host:port) datanode.
+     * The data storages are distinguished by their storageIDs. When a new
+     * data storage is reported the namenode issues a new unique storageID.
+     * <p>
+     * Finally, the namenode returns its namespaceID as the registrationID
+     * for the datanodes. 
+     * namespaceID is a persistent attribute of the name space.
+     * The registrationID is checked every time the datanode is communicating
+     * with the namenode. 
+     * Datanodes with inappropriate registrationID are rejected.
+     * If the namenode stops, and then restarts it can restore its 
+     * namespaceID and will continue serving the datanodes that has previously
+     * registered with the namenode without restarting the whole cluster.
+     * 
+     * @see DataNode#register()
+     * @author Konstantin Shvachko
+     */
+    public synchronized void registerDatanode( DatanodeRegistration nodeReg,
+                                               String networkLocation
+                                              ) throws IOException {
+      NameNode.stateChangeLog.info(
+          "BLOCK* NameSystem.registerDatanode: "
+          + "node registration from " + nodeReg.getName()
+          + " storage " + nodeReg.getStorageID() );
+
+      nodeReg.registrationID = getRegistrationID();
+      DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
+      DatanodeDescriptor nodeN = getDatanodeByName( nodeReg.getName() );
+      
+      if( nodeN != null && nodeN != nodeS ) {
+          NameNode.LOG.info( "BLOCK* NameSystem.registerDatanode: "
+                  + "node from name: " + nodeN.getName() );
+        // nodeN previously served a different data storage, 
+        // which is not served by anybody anymore.
+        removeDatanode( nodeN );
+        // physically remove node from datanodeMap
+        wipeDatanode( nodeN );
+        // and log removal
+        getEditLog().logRemoveDatanode( nodeN );
+        nodeN = null;
+      }
+
+      if ( nodeS != null ) {
+        if( nodeN == nodeS ) {
+          // The same datanode has been just restarted to serve the same data 
+          // storage. We do not need to remove old data blocks, the delta will
+          // be calculated on the next block report from the datanode
+          NameNode.stateChangeLog.debug("BLOCK* NameSystem.registerDatanode: "
+                                        + "node restarted." );
+        } else {
+          // nodeS is found
+          // The registering datanode is a replacement node for the existing 
+          // data storage, which from now on will be served by a new node.
+          NameNode.stateChangeLog.debug(
+            "BLOCK* NameSystem.registerDatanode: "
+            + "node " + nodeS.getName()
+            + " is replaced by " + nodeReg.getName() + "." );
+        }
+        getEditLog().logRemoveDatanode( nodeS );
+        // update cluster map
+        clusterMap.remove( nodeS );
+        nodeS.updateRegInfo( nodeReg );
+        nodeS.setNetworkLocation( networkLocation );
+        clusterMap.add( nodeS );
+        getEditLog().logAddDatanode( nodeS );
+        
+        // also treat the registration message as a heartbeat
+        synchronized( heartbeats ) {
+            heartbeats.add( nodeS );
+            //update its timestamp
+            nodeS.updateHeartbeat( 0L, 0L, 0);
+            nodeS.isAlive = true;
+        }
+        return;
+      } 
+
+      // this is a new datanode serving a new data storage
+      if( nodeReg.getStorageID().equals("") ) {
+        // this data storage has never been registered
+        // it is either empty or was created by pre-storageID version of DFS
+        nodeReg.storageID = newStorageID();
+        NameNode.stateChangeLog.debug(
+            "BLOCK* NameSystem.registerDatanode: "
+            + "new storageID " + nodeReg.getStorageID() + " assigned." );
+      }
+      // register new datanode
+      DatanodeDescriptor nodeDescr 
+                  = new DatanodeDescriptor( nodeReg, networkLocation );
+      unprotectedAddDatanode( nodeDescr );
+      getEditLog().logAddDatanode( nodeDescr );
+      
+      // also treat the registration message as a heartbeat
+      synchronized( heartbeats ) {
+          heartbeats.add( nodeDescr );
+          nodeDescr.isAlive = true;
+          // no need to update its timestamp
+          // because its is done when the descriptor is created
+      }
+      return;
+    }
+    
+    /**
+     * Get registrationID for datanodes based on the namespaceID.
+     * 
+     * @see #registerDatanode(DatanodeRegistration)
+     * @see FSImage#newNamespaceID()
+     * @return registration ID
+     */
+    public String getRegistrationID() {
+      return "NS" + Integer.toString( dir.namespaceID );
+    }
+    
+    /**
+     * Generate new storage ID.
+     * 
+     * @return unique storage ID
+     * 
+     * Note: that collisions are still possible if somebody will try 
+     * to bring in a data storage from a different cluster.
+     */
+    private String newStorageID() {
+      String newID = null;
+      while( newID == null ) {
+        newID = "DS" + Integer.toString( r.nextInt() );
+        if( datanodeMap.get( newID ) != null )
+          newID = null;
+      }
+      return newID;
+    }
+    
+    private boolean isDatanodeDead(DatanodeDescriptor node) {
+      return (node.getLastUpdate() <
+          (System.currentTimeMillis() - heartbeatExpireInterval));
+    }
+    
+    /**
+     * The given node has reported in.  This method should:
+     * 1) Record the heartbeat, so the datanode isn't timed out
+     * 2) Adjust usage stats for future block allocation
+     * 
+     * If a substantial amount of time passed since the last datanode 
+     * heartbeat then request an immediate block report.  
+     * 
+     * @return true if block report is required or false otherwise.
+     * @throws IOException
+     */
+    public boolean gotHeartbeat( DatanodeID nodeID,
+                                 long capacity, 
+                                 long remaining,
+                                 int xceiverCount,
+                                 int xmitsInProgress,
+                                 Object[] xferResults,
+                                 Object deleteList[]
+                                 ) throws IOException {
+      synchronized (heartbeats) {
+        synchronized (datanodeMap) {
+          DatanodeDescriptor nodeinfo;
+          try {
+            nodeinfo = getDatanode( nodeID );
+            if (nodeinfo == null ) {
+                return true;
+            }
+          } catch(UnregisteredDatanodeException e) {
+              return true;
+          }
+          
+          if( !nodeinfo.isAlive ) {
+              return true;
+          } else {
+              updateStats(nodeinfo, false);
+              nodeinfo.updateHeartbeat(capacity, remaining, xceiverCount);
+              updateStats(nodeinfo, true);
+              //
+              // Extract pending replication work or block invalidation
+              // work from the datanode descriptor
+              //
+              nodeinfo.getReplicationSets(this.maxReplicationStreams - 
+                                          xmitsInProgress, xferResults); 
+              if (xferResults[0] == null) {
+                nodeinfo.getInvalidateBlocks(FSConstants.BLOCK_INVALIDATE_CHUNK,
+                                             deleteList);
+              }
+              return false;
+          }
+        }
+      }
+    }
+
+    private void updateStats(DatanodeDescriptor node, boolean isAdded) {
+      //
+      // The statistics are protected by the heartbeat lock
+      //
+      assert(Thread.holdsLock(heartbeats));
+      if (isAdded) {
+        totalCapacity += node.getCapacity();
+        totalRemaining += node.getRemaining();
+        totalLoad += node.getXceiverCount();
+      } else {
+        totalCapacity -= node.getCapacity();
+        totalRemaining -= node.getRemaining();
+        totalLoad -= node.getXceiverCount();
+      }
+    }
+    /**
+     * Periodically calls heartbeatCheck().
+     */
+    class HeartbeatMonitor implements Runnable {
+        /**
+         */
+        public void run() {
+            while (fsRunning) {
+                heartbeatCheck();
+                try {
+                    Thread.sleep(heartbeatRecheckInterval);
+                } catch (InterruptedException ie) {
+                }
+            }
+        }
+    }
+
+    /**
+     * Periodically calls computeReplicationWork().
+     */
+    class ReplicationMonitor implements Runnable {
+      public void run() {
+        while (fsRunning) {
+          try {
+            computeDatanodeWork();
+            Thread.sleep(replicationRecheckInterval);
+          } catch (InterruptedException ie) {
+          } catch (IOException ie) {
+            LOG.warn("ReplicationMonitor thread received exception. " + ie);
+          }
+        }
+      }
+    }
+
+    /**
+     * Look at a few datanodes and compute any replication work that 
+     * can be scheduled on them. The datanode will be infomed of this
+     * work at the next heartbeat.
+     */
+    void computeDatanodeWork() throws IOException {
+      int numiter = 0;
+      int foundwork = 0;
+      int hsize = 0;
+
+      while (true) {
+        DatanodeDescriptor node = null;
+
+        //
+        // pick the datanode that was the last one in the
+        // previous invocation of this method.
+        //
+        synchronized (heartbeats) {
+          hsize = heartbeats.size();
+          if (numiter++ >= hsize) {
+            break;
+          }
+          if (replIndex >= hsize) {
+            replIndex = 0;
+          }
+          node = heartbeats.get(replIndex);
+          replIndex++;
+        }
+
+        //
+        // Is there replication work to be computed for this datanode?
+        //
+        int precomputed = node.getNumberOfBlocksToBeReplicated();
+        int needed = this.maxReplicationStreams - precomputed;
+        boolean doReplication = false;
+        boolean doInvalidation = false;
+        if (needed > 0) {
+          //
+          // Compute replication work and store work into the datanode
+          //
+          Object replsets[] = pendingTransfers(node, needed);
+          if (replsets != null) {
+            doReplication = true;
+            addBlocksToBeReplicated(node, (Block[])replsets[0], 
+                                   (DatanodeDescriptor[][])replsets[1]);
+          }
+        }
+        if (!doReplication) {
+          //
+          // Determine if block deletion is pending for this datanode
+          //
+          Block blocklist[] = blocksToInvalidate(node);
+          if (blocklist != null) {
+            doInvalidation = true;
+            addBlocksToBeInvalidated(node, blocklist);
+          }
+        }
+        if (doReplication || doInvalidation) {
+          //
+          // If we have already computed work for a predefined
+          // number of datanodes in this iteration, then relax
+          //
+          if (foundwork > ((hsize * REPL_WORK_PER_ITERATION)/100)) {
+            break;
+          }
+          foundwork++;
+        } else {
+          //
+          // See if the decommissioned node has finished moving all
+          // its datablocks to another replica. This is a loose
+          // heuristic to determine when a decommission is really over.
+          //
+          checkDecommissionState(node);
+        }
+      }
+    }
+
+    /**
+     * Add more replication work for this datanode.
+     */
+    synchronized void addBlocksToBeReplicated(DatanodeDescriptor node, 
+                                 Block[] blocklist,
+                                 DatanodeDescriptor[][] targets) 
+                                 throws IOException {
+      //
+      // Find the datanode with the FSNamesystem lock held.
+      //
+      DatanodeDescriptor n = getDatanode(node);
+      if (n != null) {
+        n.addBlocksToBeReplicated(blocklist, targets);
+      }
+    }
+
+    /**
+     * Add more block invalidation work for this datanode.
+     */
+    synchronized void addBlocksToBeInvalidated(DatanodeDescriptor node, 
+                                 Block[] blocklist) throws IOException {
+      //
+      // Find the datanode with the FSNamesystem lock held.
+      //
+      DatanodeDescriptor n = getDatanode(node);
+      if (n != null) {
+        n.addBlocksToBeInvalidated(blocklist);
+      }
+    }
+
+    /**
+     * remove a datanode descriptor
+     * @param nodeID datanode ID
+     * @author hairong
+     */
+    synchronized public void removeDatanode( DatanodeID nodeID ) 
+    throws IOException {
+      DatanodeDescriptor nodeInfo = getDatanode( nodeID );
+      if (nodeInfo != null) {
+        removeDatanode( nodeInfo );
+      } else {
+          NameNode.stateChangeLog.warn("BLOCK* NameSystem.removeDatanode: "
+                  + nodeInfo.getName() + " does not exist");
+      }
+  }
+  
+  /**
+   * remove a datanode descriptor
+   * @param nodeInfo datanode descriptor
+   * @author hairong
+   */
+    private void removeDatanode( DatanodeDescriptor nodeInfo ) {
+      if (nodeInfo.isAlive) {
+        updateStats(nodeInfo, false);
+        heartbeats.remove(nodeInfo);
+        nodeInfo.isAlive = false;
+      }
+
+      for (Iterator<Block> it = nodeInfo.getBlockIterator(); it.hasNext(); ) {
+          removeStoredBlock(it.next(), nodeInfo);
+      }
+      unprotectedRemoveDatanode(nodeInfo);
+      clusterMap.remove(nodeInfo);
+    }
+
+    void unprotectedRemoveDatanode( DatanodeDescriptor nodeDescr ) {
+      // datanodeMap.remove(nodeDescr.getStorageID());
+      // deaddatanodeMap.put(nodeDescr.getName(), nodeDescr);
+      nodeDescr.resetBlocks();
+      NameNode.stateChangeLog.debug(
+          "BLOCK* NameSystem.unprotectedRemoveDatanode: "
+          + nodeDescr.getName() + " is out of service now.");
+    }
+    
+    void unprotectedAddDatanode( DatanodeDescriptor nodeDescr ) {
+      datanodeMap.put( nodeDescr.getStorageID(), nodeDescr );
+      clusterMap.add(nodeDescr);
+      NameNode.stateChangeLog.debug(
+          "BLOCK* NameSystem.unprotectedAddDatanode: "
+          + "node " + nodeDescr.getName() + " is added to datanodeMap." );
+    }
+
+    
+    /**
+     * Physically remove node from datanodeMap.
+     * 
+     * @param nodeID node
+     */
+    void wipeDatanode( DatanodeID nodeID ) {
+      String key = nodeID.getStorageID();
+      datanodeMap.remove(key);
+      NameNode.stateChangeLog.debug(
+          "BLOCK* NameSystem.wipeDatanode: "
+          + nodeID.getName() + " storage " + nodeID.getStorageID() 
+          + " is removed from datanodeMap.");
+    }
+    
+    private FSEditLog getEditLog() {
+      return dir.fsImage.getEditLog();
+    }
+
+    /**
+     * Check if there are any expired heartbeats, and if so,
+     * whether any blocks have to be re-replicated.
+     * While removing dead datanodes, make sure that only one datanode is marked
+     * dead at a time within the synchronized section. Otherwise, a cascading
+     * effect causes more datanodes to be declared dead.
+     */
+    void heartbeatCheck() {
+      boolean allAlive = false;
+      while (!allAlive) {
+        boolean foundDead = false;
+        DatanodeID nodeID = null;
+
+        // locate the first dead node.
+        synchronized(heartbeats) {
+            for (Iterator<DatanodeDescriptor> it = heartbeats.iterator();
+            it.hasNext();) {
+              DatanodeDescriptor nodeInfo = it.next();
+              if (isDatanodeDead(nodeInfo)) {
+                foundDead = true;
+                nodeID = nodeInfo;
+                break;
+              }
+            }
+        }
+
+        // acquire the fsnamesystem lock, and then remove the dead node.
+        if (foundDead) {
+          synchronized (this) {
+            synchronized(heartbeats) {
+              synchronized (datanodeMap) {
+                DatanodeDescriptor nodeInfo = null;
+                try {
+                  nodeInfo = getDatanode(nodeID);
+                } catch (IOException e) {
+                  nodeInfo = null;
+                }
+                if (nodeInfo != null && isDatanodeDead(nodeInfo)) {
+                  NameNode.stateChangeLog.info("BLOCK* NameSystem.heartbeatCheck: "
+                    + "lost heartbeat from " + nodeInfo.getName());
+                  removeDatanode(nodeInfo);
+                }
+              }
+            }
+          }
+        }
+        allAlive = ! foundDead;
+      }
+    }
+    
+    /**
+     * The given node is reporting all its blocks.  Use this info to 
+     * update the (machine-->blocklist) and (block-->machinelist) tables.
+     */
+    public synchronized Block[] processReport(DatanodeID nodeID, 
+                                              Block newReport[]
+                                            ) throws IOException {
+        NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: "
+          +"from "+nodeID.getName()+" "+newReport.length+" blocks" );
+        DatanodeDescriptor node = getDatanode( nodeID );
+
+        //
+        // Modify the (block-->datanode) map, according to the difference
+        // between the old and new block report.
+        //
+        int newPos = 0;
+        Iterator<Block> iter = node.getBlockIterator();
+        Block oldblk = iter.hasNext() ? iter.next() : null;
+        Block newblk = (newReport != null && newReport.length > 0) ? 
+                        newReport[0]	: null;
+
+        // common case is that most of the blocks from the datanode
+        // matches blocks in datanode descriptor.                
+        Collection<Block> toRemove = new LinkedList<Block>();
+        Collection<Block> toAdd = new LinkedList<Block>();
+        
+        while (oldblk != null || newblk != null) {
+           
+            int cmp = (oldblk == null) ? 1 : 
+                       ((newblk == null) ? -1 : oldblk.compareTo(newblk));
+
+            if (cmp == 0) {
+                // Do nothing, blocks are the same
+                newPos++;
+                oldblk = iter.hasNext() ? iter.next() : null;
+                newblk = (newPos < newReport.length)
+                         ? newReport[newPos] : null;
+            } else if (cmp < 0) {
+                // The old report has a block the new one does not
+                toRemove.add(oldblk);
+                oldblk = iter.hasNext() ? iter.next() : null;
+            } else {
+                // The new report has a block the old one does not
+                toAdd.add(newblk);
+                newPos++;
+                newblk = (newPos < newReport.length)
+                         ? newReport[newPos] : null;
+            }
+        }
+        
+        for ( Iterator<Block> i = toRemove.iterator(); i.hasNext(); ) {
+            Block b = i.next();
+            removeStoredBlock( b, node );
+            node.removeBlock( b );
+        }
+        for ( Iterator<Block> i = toAdd.iterator(); i.hasNext(); ) {
+            Block b = i.next();
+            node.addBlock( addStoredBlock(b, node) );
+        }
+        
+        //
+        // We've now completely updated the node's block report profile.
+        // We now go through all its blocks and find which ones are invalid,
+        // no longer pending, or over-replicated.
+        //
+        // (Note it's not enough to just invalidate blocks at lease expiry 
+        // time; datanodes can go down before the client's lease on 
+        // the failed file expires and miss the "expire" event.)
+        //
+        // This function considers every block on a datanode, and thus
+        // should only be invoked infrequently.
+        //
+        Collection<Block> obsolete = new ArrayList<Block>();
+        for (Iterator<Block> it = node.getBlockIterator(); it.hasNext(); ) {
+            Block b = it.next();
+
+            if (! dir.isValidBlock(b) && ! pendingCreateBlocks.contains(b)) {
+                obsolete.add(b);
+                NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: "
+                        +"ask "+nodeID.getName()+" to delete "+b.getBlockName() );
+            }
+        }
+        return (Block[]) obsolete.toArray(new Block[obsolete.size()]);
+    }
+
+    /**
+     * Modify (block-->datanode) map.  Remove block from set of 
+     * needed replications if this takes care of the problem.
+     * @return the block that is stored in blockMap.
+     */
+    synchronized Block addStoredBlock(Block block, DatanodeDescriptor node) {
+        List<DatanodeDescriptor> containingNodes = blocksMap.get(block);
+        if (containingNodes == null) {
+            //Create an arraylist with the current replication factor
+            FSDirectory.INode inode = dir.getFileByBlock(block);
+            int replication = (inode != null) ? 
+                              inode.getReplication() : defaultReplication;
+            containingNodes = new ArrayList<DatanodeDescriptor>(replication);
+            blocksMap.put(block, containingNodes);
+        } else {
+            Block storedBlock = 
+                containingNodes.get(0).getBlock(block);
+            // update stored block's length.
+            if ( storedBlock != null ) {
+                if ( block.getNumBytes() > 0 ) {
+                    storedBlock.setNumBytes( block.getNumBytes() );
+                }
+                block = storedBlock;
+            }
+        }
+        int curReplicaDelta = 0;
+        if (! containingNodes.contains(node)) {
+            containingNodes.add(node);
+            curReplicaDelta = 1;
+            // 
+            // Hairong: I would prefer to set the level of next logrecord
+            // to be debug.
+            // But at startup time, because too many new blocks come in
+            // they simply take up all the space in the log file 
+            // So I set the level to be trace
+            //
+            NameNode.stateChangeLog.trace("BLOCK* NameSystem.addStoredBlock: "
+                    +"blockMap updated: "+node.getName()+" is added to "+block.getBlockName() );
+        } else {
+            NameNode.stateChangeLog.warn("BLOCK* NameSystem.addStoredBlock: "
+                    + "Redundant addStoredBlock request received for " 
+                    + block.getBlockName() + " on " + node.getName());
+        }
+
+        FSDirectory.INode fileINode = dir.getFileByBlock(block);
+        if( fileINode == null )  // block does not belong to any file
+            return block;
+        
+        // filter out containingNodes that are marked for decommission.
+        int numCurrentReplica = countContainingNodes(containingNodes);
+        
+        // check whether safe replication is reached for the block
+        // only if it is a part of a files
+        incrementSafeBlockCount( numCurrentReplica );
+        
+        // handle underReplication/overReplication
+        short fileReplication = fileINode.getReplication();
+        if(neededReplications.contains(block)) {
+            neededReplications.update(block, curReplicaDelta, 0);
+        }
+        if (numCurrentReplica >= fileReplication ) {
+            pendingReplications.remove(block);
+        }        
+        proccessOverReplicatedBlock( block, fileReplication );
+        return block;
+    }
+    
+    /**
+     * Find how many of the containing nodes are "extra", if any.
+     * If there are any extras, call chooseExcessReplicates() to
+     * mark them in the excessReplicateMap.
+     */
+    private void proccessOverReplicatedBlock( Block block, short replication ) {
+      Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block);
+      if( containingNodes == null )
+        return;
+      Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>();
+      for (Iterator<DatanodeDescriptor> it = containingNodes.iterator(); it.hasNext(); ) {
+          DatanodeDescriptor cur = it.next();
+          Collection<Block> excessBlocks = excessReplicateMap.get(cur.getStorageID());
+          if (excessBlocks == null || ! excessBlocks.contains(block)) {
+            if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
+              nonExcess.add(cur);
+            }
+          }
+      }
+      chooseExcessReplicates(nonExcess, block, replication);    
+    }
+
+    /**
+     * We want "replication" replicates for the block, but we now have too many.  
+     * In this method, copy enough nodes from 'srcNodes' into 'dstNodes' such that:
+     *
+     * srcNodes.size() - dstNodes.size() == replication
+     *
+     * We pick node with least free space
+     * In the future, we might enforce some kind of policy 
+     * (like making sure replicates are spread across racks).
+     */
+    void chooseExcessReplicates(Collection<DatanodeDescriptor> nonExcess, 
+                                Block b, short replication) {
+        while (nonExcess.size() - replication > 0) {
+            DatanodeInfo cur = null;
+            long minSpace = Long.MAX_VALUE;
+            
+            for (Iterator<DatanodeDescriptor> iter = nonExcess.iterator(); iter.hasNext();) {
+                DatanodeInfo node = iter.next();
+                long free = node.getRemaining();
+                
+                if(minSpace > free) {
+                    minSpace = free;
+                    cur = node;
+                }
+            }
+            
+            nonExcess.remove(cur);
+
+            Collection<Block> excessBlocks = excessReplicateMap.get(cur.getStorageID());
+            if (excessBlocks == null) {
+                excessBlocks = new TreeSet<Block>();
+                excessReplicateMap.put(cur.getStorageID(), excessBlocks);
+            }
+            excessBlocks.add(b);
+            NameNode.stateChangeLog.debug("BLOCK* NameSystem.chooseExcessReplicates: "
+                    +"("+cur.getName()+", "+b.getBlockName()+") is added to excessReplicateMap" );
+
+            //
+            // The 'excessblocks' tracks blocks until we get confirmation
+            // that the datanode has deleted them; the only way we remove them
+            // is when we get a "removeBlock" message.  
+            //
+            // The 'invalidate' list is used to inform the datanode the block 
+            // should be deleted.  Items are removed from the invalidate list
+            // upon giving instructions to the namenode.
+            //
+            Collection<Block> invalidateSet = recentInvalidateSets.get(cur.getStorageID());
+            if (invalidateSet == null) {
+                invalidateSet = new ArrayList<Block>();
+                recentInvalidateSets.put(cur.getStorageID(), invalidateSet);
+            }
+            invalidateSet.add(b);
+            NameNode.stateChangeLog.debug("BLOCK* NameSystem.chooseExcessReplicates: "
+                    +"("+cur.getName()+", "+b.getBlockName()+") is added to recentInvalidateSets" );
+        }
+    }
+
+    /**
+     * Modify (block-->datanode) map.  Possibly generate 
+     * replication tasks, if the removed block is still valid.
+     */
+    synchronized void removeStoredBlock(Block block, DatanodeDescriptor node) {
+        NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
+                +block.getBlockName() + " from "+node.getName() );
+        Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block);
+        if (containingNodes == null || ! containingNodes.contains(node)) {
+          NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
+            +block.getBlockName()+" has already been removed from node "+node );
+          return;
+        }
+        containingNodes.remove(node);
+        
+        // filter out containingNodes that are marked for decommission.
+        int numCurrentReplica = countContainingNodes(containingNodes);
+
+        decrementSafeBlockCount( numCurrentReplica );
+        if( containingNodes.isEmpty() )
+          blocksMap.remove(block);
+        //
+        // It's possible that the block was removed because of a datanode
+        // failure.  If the block is still valid, check if replication is
+        // necessary.  In that case, put block on a possibly-will-
+        // be-replicated list.
+        //
+        FSDirectory.INode fileINode = dir.getFileByBlock(block);
+        if( fileINode != null ) {
+            neededReplications.update(block, -1, 0);
+        }
+
+        //
+        // We've removed a block from a node, so it's definitely no longer
+        // in "excess" there.
+        //
+        Collection<Block> excessBlocks = excessReplicateMap.get(node.getStorageID());
+        if (excessBlocks != null) {
+            excessBlocks.remove(block);
+            NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
+                    +block.getBlockName()+" is removed from excessBlocks" );
+            if (excessBlocks.size() == 0) {
+                excessReplicateMap.remove(node.getStorageID());
+            }
+        }
+    }
+
+    /**
+     * The given node is reporting that it received a certain block.
+     */
+    public synchronized void blockReceived( DatanodeID nodeID,  
+                                            Block block
+                                          ) throws IOException {
+        DatanodeDescriptor node = getDatanode( nodeID );
+        if (node == null) {
+            NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: "
+             + block.getBlockName() + " is received from an unrecorded node " 
+             + nodeID.getName() );
+            throw new IllegalArgumentException(
+                "Unexpected exception.  Got blockReceived message from node " 
+                + block.getBlockName() + ", but there is no info for it");
+        }
+        NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: "
+                +block.getBlockName()+" is received from " + nodeID.getName() );
+        //
+        // Modify the blocks->datanode map and node's map.
+        // 
+        node.addBlock( addStoredBlock(block, node) );
+    }
+
+    /**
+     * Total raw bytes.
+     */
+    public long totalCapacity() {
+
+      synchronized (heartbeats) {
+        return totalCapacity;
+      }
+    }
+
+    /**
+     * Total non-used raw bytes.
+     */
+    public long totalRemaining() {
+      synchronized (heartbeats) {
+        return totalRemaining;
+      }
+    }
+
+    /**
+     * Total number of connections.
+     */
+    public int totalLoad() {
+      synchronized (heartbeats) {
+        return totalLoad;
+      }
+    }
+
+    public synchronized DatanodeInfo[] datanodeReport() {
+      DatanodeInfo results[] = null;
+        synchronized (datanodeMap) {
+            results = new DatanodeInfo[datanodeMap.size()];
+            int i = 0;
+            for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); it.hasNext(); )
+              results[i++] = new DatanodeInfo( it.next() );
+        }
+        return results;
+    }
+    
+    /**
+     */
+    public synchronized void DFSNodesStatus( ArrayList<DatanodeDescriptor> live, 
+                                             ArrayList<DatanodeDescriptor> dead ) {
+      synchronized (datanodeMap) {

[... 1489 lines stripped ...]


Mime
View raw message