hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r431713 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/
Date Tue, 15 Aug 2006 21:50:07 GMT
Author: cutting
Date: Tue Aug 15 14:50:07 2006
New Revision: 431713

URL: http://svn.apache.org/viewvc?rev=431713&view=rev
Log:
HADOOP-456.  Change the DFS namenode to keep a persistent record of the set of known datanodes.
 Contributed by Konstantin.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=431713&r1=431712&r2=431713&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Aug 15 14:50:07 2006
@@ -40,6 +40,12 @@
  9. HADOOP-455.  Fix a bug in Text, where DEL was not permitted.
     (Hairong Kuang via cutting)
 
+10. HADOOP-456.  Change the DFS namenode to keep a persistent record
+    of the set of known datanodes.  This will be used to implement a
+    "safe mode" where filesystem changes are prohibited when a
+    critical percentage of the datanodes are unavailable.
+    (Konstantin Shvachko via cutting)
+
 
 Release 0.5.0 - 2006-08-04
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java?rev=431713&r1=431712&r2=431713&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java Tue Aug 15 14:50:07
2006
@@ -39,7 +39,7 @@
   public DataStorage( File datadir ) throws IOException {
     this( DataNode.DFS_CURRENT_VERSION, datadir );
     
-    if( version != DataNode.DFS_CURRENT_VERSION )
+    if( version < FSConstants.DFS_CURRENT_VERSION ) // future version
       throw new IncorrectVersionException( version, "data storage" );
   }
   
@@ -128,9 +128,7 @@
   public boolean read() throws IOException {
     storageFile.seek(0);
     this.version = storageFile.readInt();
-    UTF8 uID = new UTF8();
-    uID.readFields( storageFile );
-    this.storageID = uID.toString();
+    this.storageID = UTF8.readString( storageFile );
     return false;
   }
 
@@ -142,7 +140,6 @@
   public void write() throws IOException {
     storageFile.seek(0);
     storageFile.writeInt( this.version );
-    UTF8 uID = new UTF8( this.storageID );
-    uID.write( storageFile );
+    UTF8.writeString( storageFile, this.storageID );
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java?rev=431713&r1=431712&r2=431713&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java Tue Aug 15
14:50:07 2006
@@ -27,8 +27,12 @@
  **************************************************/
 class DatanodeDescriptor extends DatanodeInfo {
 
-  private volatile TreeSet blocks = null;
+  private volatile TreeSet blocks = new TreeSet();
 
+  DatanodeDescriptor() {
+    super();
+  }
+  
   DatanodeDescriptor( DatanodeID nodeID ) {
     this( nodeID, 0L, 0L, 0 );
   }
@@ -41,7 +45,6 @@
                       long remaining,
                       int xceiverCount ) {
     super( nodeID );
-    this.blocks = new TreeSet();
     updateHeartbeat(capacity, remaining, xceiverCount);
   }
 
@@ -60,6 +63,13 @@
     blocks.add(b);
   }
 
+  void resetBlocks() {
+    this.capacity = 0;
+    this.remaining = 0;
+    this.xceiverCount = 0;
+    this.blocks.clear();
+  }
+  
   /**
    */
   void updateHeartbeat(long capacity, long remaining, int xceiverCount) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?rev=431713&r1=431712&r2=431713&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Tue Aug 15 14:50:07
2006
@@ -113,5 +113,7 @@
     // Version is reflected in the data storage file.
     // Versions are negative.
     // Decrement DFS_CURRENT_VERSION to define a new version.
-    public static final int DFS_CURRENT_VERSION = -2;
+    public static final int DFS_CURRENT_VERSION = -3;
+    // Current version: New operations OP_DATANODE_REMOVE and OP_DATANODE_ADD
+    // are introduced
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java?rev=431713&r1=431712&r2=431713&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java Tue Aug 15 14:50:07
2006
@@ -33,8 +33,6 @@
  * It keeps the filename->blockset mapping always-current
  * and logged to disk.
  * 
- * TODO: Factor out to a standalone class.
- * 
  * @author Mike Cafarella
  *************************************************/
 class FSDirectory implements FSConstants {
@@ -42,6 +40,8 @@
     /******************************************************
      * We keep an in-memory representation of the file/block
      * hierarchy.
+     * 
+     * TODO: Factor out INode to a standalone class.
      ******************************************************/
     class INode {
         private String name;
@@ -305,9 +305,12 @@
     private int numFilesDeleted = 0;
     
     /** Access an existing dfs name directory. */
-    public FSDirectory(File dir, Configuration conf) throws IOException {
-      this.fsImage = new FSImage( dir, conf );
-      fsImage.loadFSImage( this, conf );
+    public FSDirectory(File dir) throws IOException {
+      this.fsImage = new FSImage( dir );
+    }
+    
+    void loadFSImage( Configuration conf ) throws IOException {
+      fsImage.loadFSImage( conf );
       synchronized (this) {
         this.ready = true;
         this.notifyAll();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java?rev=431713&r1=431712&r2=431713&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java Tue Aug 15 14:50:07
2006
@@ -39,6 +39,8 @@
   private static final byte OP_DELETE = 2;
   private static final byte OP_MKDIR = 3;
   private static final byte OP_SET_REPLICATION = 4;
+  private static final byte OP_DATANODE_ADD = 5;
+  private static final byte OP_DATANODE_REMOVE = 6;
   
   private File editsFile;
   DataOutputStream editsStream = null;
@@ -74,9 +76,9 @@
    * This is where we apply edits that we've been writing to disk all
    * along.
    */
-  int loadFSEdits( FSDirectory fsDir, 
-                   Configuration conf
-                 ) throws IOException {
+  int loadFSEdits( Configuration conf ) throws IOException {
+    FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
+    FSDirectory fsDir = fsNamesys.dir;
     int numEdits = 0;
     int logVersion = 0;
     
@@ -170,6 +172,29 @@
             fsDir.unprotectedMkdir(src.toString());
             break;
           }
+          case OP_DATANODE_ADD: {
+            if( logVersion > -3 )
+              throw new IOException("Unexpected opcode " + opcode 
+                  + " for version " + logVersion );
+            DatanodeDescriptor node = new DatanodeDescriptor();
+            node.readFields(in);
+            fsNamesys.unprotectedAddDatanode( node );
+            break;
+          }
+          case OP_DATANODE_REMOVE: {
+            if( logVersion > -3 )
+              throw new IOException("Unexpected opcode " + opcode 
+                  + " for version " + logVersion );
+            DatanodeID nodeID = new DatanodeID();
+            nodeID.readFields(in);
+            DatanodeDescriptor node = fsNamesys.getDatanode( nodeID );
+            if( node != null ) {
+              fsNamesys.unprotectedRemoveDatanode( node );
+              // physically remove node from datanodeMap
+              fsNamesys.wipeDatanode( nodeID );
+            }
+            break;
+          }
           default: {
             throw new IOException("Never seen opcode " + opcode);
           }
@@ -260,11 +285,27 @@
     logEdit(OP_DELETE, src, null);
   }
   
+  /** 
+   * Creates a record in edit log corresponding to a new data node
+   * registration event.
+   */
+  void logAddDatanode( DatanodeDescriptor node ) {
+    logEdit( OP_DATANODE_ADD, node, null );
+  }
+  
+  /** 
+   * Creates a record in edit log corresponding to a data node
+   * removal event.
+   */
+  void logRemoveDatanode( DatanodeID nodeID ) {
+    logEdit( OP_DATANODE_REMOVE, new DatanodeID( nodeID ), null );
+  }
+  
   static UTF8 toLogReplication( short replication ) {
     return new UTF8( Short.toString(replication));
   }
   
   static short fromLogReplication( UTF8 replication ) {
     return Short.parseShort(replication.toString());
-  }    
+  }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java?rev=431713&r1=431712&r2=431713&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java Tue Aug 15 14:50:07 2006
@@ -25,12 +25,12 @@
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Random;
+import java.util.TreeMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.dfs.FSDirectory.INode;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.io.UTF8;
-import org.apache.hadoop.io.Writable;
 
 /**
  * FSImage handles checkpointing and logging of the namespace edits.
@@ -49,7 +49,7 @@
   /**
    * 
    */
-  FSImage( File fsDir, Configuration conf ) throws IOException {
+  FSImage( File fsDir ) throws IOException {
     this.imageDir = new File(fsDir, "image");
     if (! imageDir.exists()) {
       throw new IOException("NameNode not formatted: " + fsDir);
@@ -67,9 +67,9 @@
    * filenames and blocks.  Return whether we should
    * "re-save" and consolidate the edit-logs
    */
-  void loadFSImage( FSDirectory fsDir, 
-                    Configuration conf
-                  ) throws IOException {
+  void loadFSImage( Configuration conf ) throws IOException {
+    FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
+    FSDirectory fsDir = fsNamesys.dir;
     File edits = editLog.getEditsFile();
     //
     // Atomic move sequence, to recover from interrupted save
@@ -120,11 +120,7 @@
         
         needToSave = ( imgVersion != FSConstants.DFS_CURRENT_VERSION );
         if( imgVersion < FSConstants.DFS_CURRENT_VERSION ) // future version
-          throw new IOException(
-              "Unsupported version of the file system image: "
-              + imgVersion
-              + ". Current version = " 
-              + FSConstants.DFS_CURRENT_VERSION + "." );
+          throw new IncorrectVersionException(imgVersion, "file system image");
         
         // read file info
         short replication = (short)conf.getInt("dfs.replication", 3);
@@ -147,6 +143,9 @@
           }
           fsDir.unprotectedAddFile(name, blocks, replication );
         }
+        
+        // load datanode info
+        this.loadDatanodes( imgVersion, in );
       } finally {
         in.close();
       }
@@ -155,15 +154,17 @@
     if( fsDir.namespaceID == 0 )
       fsDir.namespaceID = newNamespaceID();
     
-    needToSave |= ( edits.exists() && editLog.loadFSEdits(fsDir, conf) > 0 );
+    needToSave |= ( edits.exists() && editLog.loadFSEdits(conf) > 0 );
     if( needToSave )
-      saveFSImage( fsDir );
+      saveFSImage();
   }
 
   /**
    * Save the contents of the FS image
    */
-  void saveFSImage( FSDirectory fsDir ) throws IOException {
+  void saveFSImage() throws IOException {
+    FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
+    FSDirectory fsDir = fsNamesys.dir;
     File curFile = new File(imageDir, FS_IMAGE);
     File newFile = new File(imageDir, NEW_FS_IMAGE);
     File oldFile = new File(imageDir, OLD_FS_IMAGE);
@@ -177,6 +178,7 @@
       out.writeInt(fsDir.namespaceID);
       out.writeInt(fsDir.rootDir.numItemsInTree() - 1);
       saveImage( "", fsDir.rootDir, out );
+      saveDatanodes( out );
     } finally {
       out.close();
     }
@@ -251,6 +253,34 @@
     for(Iterator it = root.getChildren().values().iterator(); it.hasNext(); ) {
       INode child = (INode) it.next();
       saveImage( fullName, child, out );
+    }
+  }
+
+  /**
+   * Save list of datanodes contained in {@link FSNamesystem#datanodeMap}.
+   * Only the {@link DatanodeInfo} part is stored.
+   * The {@link DatanodeDescriptor#blocks} is transient.
+   * 
+   * @param out output stream
+   * @throws IOException
+   */
+  void saveDatanodes( DataOutputStream out ) throws IOException {
+    TreeMap datanodeMap = FSNamesystem.getFSNamesystem().datanodeMap;
+    int size = datanodeMap.size();
+    out.writeInt( size );
+    for( Iterator it = datanodeMap.values().iterator(); it.hasNext(); )
+      ((DatanodeDescriptor)it.next()).write( out );
+  }
+
+  void loadDatanodes( int version, DataInputStream in ) throws IOException {
+    if( version > -3 ) // pre datanode image version
+      return;
+    FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
+    int size = in.readInt();
+    for( int i = 0; i < size; i++ ) {
+      DatanodeDescriptor node = new DatanodeDescriptor();
+      node.readFields(in);
+      fsNamesys.unprotectedAddDatanode( node );
     }
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=431713&r1=431712&r2=431713&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Tue Aug 15 14:50:07
2006
@@ -53,19 +53,30 @@
     //
     Map blocksMap = new HashMap();
 
-    //
-    // Stores the datanode-->block map.  Done by storing a 
-    // set of datanode info objects, sorted by name.  Updated only in
-    // response to client-sent information.
-    // Mapping: StorageID -> DatanodeDescriptor
-    //
+    /**
+     * Stores the datanode -> block map.  
+     * <p>
+     * Done by storing a set of {@link DatanodeDescriptor} objects, sorted by 
+     * storage id. In order to keep the storage map consistent it tracks 
+     * all storages ever registered with the namenode.
+     * A descriptor corresponding to a specific storage id can be
+     * <ul> 
+     * <li>added to the map if it is a new storage id;</li>
+     * <li>updated with a new datanode started as a replacement for the old one 
+     * with the same storage id; and </li>
+     * <li>removed if and only if an existing datanode is restarted to serve a
+     * different storage id.</li>
+     * </ul> <br>
+     * The list of the {@link DatanodeDescriptor}s in the map is checkpointed
+     * in the namespace image file. Only the {@link DatanodeInfo} part is 
+     * persistent, the list of blocks is restored from the datanode block
+     * reports. 
+     * <p>
+     * Mapping: StorageID -> DatanodeDescriptor
+     */
     TreeMap datanodeMap = new TreeMap();
 
     //
-    // Stores the set of dead datanodes
-    TreeMap deaddatanodeMap = new TreeMap();
-    
-    //
     // Keeps a Vector for every named machine.  The Vector contains
     // blocks that have recently been invalidated and are thought to live
     // on the machine in question.
@@ -110,9 +121,13 @@
     //
     Random r = new Random();
 
-    //
-    // Stores a set of DatanodeDescriptor objects, sorted by heartbeat
-    //
+    /**
+     * Stores a set of DatanodeDescriptor objects, sorted by heartbeat.
+     * This is a subset of {@link #datanodeMap}, containing nodes that are 
+     * considered alive.
+     * The {@link HeartbeatMonitor} periodically checks for outdated entries,
+     * and removes them from the set.
+     */
     TreeSet heartbeats = new TreeSet(new Comparator() {
         public int compare(Object o1, Object o2) {
             DatanodeDescriptor d1 = (DatanodeDescriptor) o1;
@@ -180,7 +195,8 @@
         InetSocketAddress addr = DataNode.createSocketAddr(conf.get("fs.default.name", "local"));
         this.localMachine = addr.getHostName();
         this.port = addr.getPort();
-        this.dir = new FSDirectory(dir, conf);
+        this.dir = new FSDirectory(dir);
+        this.dir.loadFSImage( conf );
         this.hbthread = new Daemon(new HeartbeatMonitor());
         this.lmthread = new Daemon(new LeaseMonitor());
         hbthread.start();
@@ -1068,37 +1084,39 @@
         // nodeN previously served a different data storage, 
         // which is not served by anybody anymore.
         removeDatanode( nodeN );
+        // physically remove node from datanodeMap
+        wipeDatanode( nodeN );
+        // and log removal
+        getEditLog().logRemoveDatanode( nodeN );
         nodeN = null;
       }
       
       // nodeN is not found
-      if( nodeS == null ) {
-        // this is a new datanode serving a new data storage
-        if( nodeReg.getStorageID().equals("") ) {
-          // this data storage has never registered
-          // it is either empty or was created by pre-storageID version of DFS
-          nodeReg.storageID = newStorageID();
-          NameNode.stateChangeLog.debug(
-              "BLOCK* NameSystem.registerDatanode: "
-              + "new storageID " + nodeReg.getStorageID() + " assigned." );
-        }
-        // register new datanode
-        datanodeMap.put(nodeReg.getStorageID(), 
-                        new DatanodeDescriptor( nodeReg ));
+      if( nodeS != null ) {
+        // nodeS is found
+        // The registering datanode is a replacement node for the existing 
+        // data storage, which from now on will be served by a new node.
         NameNode.stateChangeLog.debug(
             "BLOCK* NameSystem.registerDatanode: "
-            + "node registered." );
+            + "node " + nodeS.name
+            + " is replaced by " + nodeReg.getName() + "." );
+        nodeS.name = nodeReg.getName();
         return;
       }
 
-      // nodeS is found
-      // The registering datanode is a replacement node for the existing 
-      // data storage, which from now on will be served by a new node.
-      NameNode.stateChangeLog.debug(
-          "BLOCK* NameSystem.registerDatanode: "
-          + "node " + nodeS.name
-          + " is replaced by " + nodeReg.getName() + "." );
-      nodeS.name = nodeReg.getName();
+      // this is a new datanode serving a new data storage
+      if( nodeReg.getStorageID().equals("") ) {
+        // this data storage has never been registered
+        // it is either empty or was created by pre-storageID version of DFS
+        nodeReg.storageID = newStorageID();
+        NameNode.stateChangeLog.debug(
+            "BLOCK* NameSystem.registerDatanode: "
+            + "new storageID " + nodeReg.getStorageID() + " assigned." );
+      }
+      // register new datanode
+      DatanodeDescriptor nodeDescr = new DatanodeDescriptor( nodeReg );
+      unprotectedAddDatanode( nodeDescr );
+      getEditLog().logAddDatanode( nodeDescr );
       return;
     }
     
@@ -1106,7 +1124,7 @@
      * Get registrationID for datanodes based on the namespaceID.
      * 
      * @see #registerDatanode(DatanodeRegistration)
-     * @see FSDirectory#newNamespaceID()
+     * @see FSImage#newNamespaceID()
      * @return registration ID
      */
     public String getRegistrationID() {
@@ -1142,27 +1160,14 @@
                                           int xceiverCount) throws IOException {
       synchronized (heartbeats) {
         synchronized (datanodeMap) {
-          long capacityDiff = 0;
-          long remainingDiff = 0;
           DatanodeDescriptor nodeinfo = getDatanode( nodeID );
-          deaddatanodeMap.remove(nodeID.getName());
-
-          if (nodeinfo == null) {
-            NameNode.stateChangeLog.debug("BLOCK* NameSystem.gotHeartbeat: "
-                    +"brand-new heartbeat from "+nodeID.getName() );
-            nodeinfo = new DatanodeDescriptor(nodeID, capacity, remaining, xceiverCount);
-            datanodeMap.put(nodeinfo.getStorageID(), nodeinfo);
-            capacityDiff = capacity;
-            remainingDiff = remaining;
-          } else {
-            capacityDiff = capacity - nodeinfo.getCapacity();
-            remainingDiff = remaining - nodeinfo.getRemaining();
-            heartbeats.remove(nodeinfo);
-            nodeinfo.updateHeartbeat(capacity, remaining, xceiverCount);
-          }
-          heartbeats.add(nodeinfo);
-          totalCapacity += capacityDiff;
-          totalRemaining += remainingDiff;
+          
+          if (nodeinfo == null) 
+            // We do not accept unregistered guests
+            throw new UnregisteredDatanodeException( nodeID );
+          removeHeartbeat(nodeinfo);
+          nodeinfo.updateHeartbeat(capacity, remaining, xceiverCount);
+          addHeartbeat(nodeinfo);
         }
       }
     }
@@ -1206,18 +1211,58 @@
    * @author hairong
    */
     private void removeDatanode( DatanodeDescriptor nodeInfo ) {
-      heartbeats.remove(nodeInfo);
-      datanodeMap.remove(nodeInfo.getStorageID());
-      deaddatanodeMap.put(nodeInfo.getName(), nodeInfo);
-      NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeDatanode: "
-              + nodeInfo.getName() + " is removed from datanodeMap");
-      totalCapacity -= nodeInfo.getCapacity();
-      totalRemaining -= nodeInfo.getRemaining();
+      removeHeartbeat(nodeInfo);
 
       Block deadblocks[] = nodeInfo.getBlocks();
       if( deadblocks != null )
         for( int i = 0; i < deadblocks.length; i++ )
           removeStoredBlock(deadblocks[i], nodeInfo);
+      unprotectedRemoveDatanode(nodeInfo);
+    }
+
+    void unprotectedRemoveDatanode( DatanodeDescriptor nodeDescr ) {
+      // datanodeMap.remove(nodeDescr.getStorageID());
+      // deaddatanodeMap.put(nodeDescr.getName(), nodeDescr);
+      nodeDescr.resetBlocks();
+      NameNode.stateChangeLog.debug(
+          "BLOCK* NameSystem.unprotectedRemoveDatanode: "
+          + nodeDescr.getName() + " is out of service now.");
+    }
+    
+    void unprotectedAddDatanode( DatanodeDescriptor nodeDescr ) {
+      datanodeMap.put( nodeDescr.getStorageID(), nodeDescr );
+      NameNode.stateChangeLog.debug(
+          "BLOCK* NameSystem.unprotectedAddDatanode: "
+          + "node " + nodeDescr.getName() + " is added to datanodeMap." );
+    }
+
+    private void addHeartbeat( DatanodeDescriptor nodeDescr ) {
+      heartbeats.add(nodeDescr);
+      totalCapacity += nodeDescr.capacity;
+      totalRemaining += nodeDescr.remaining;
+    }
+    
+    private void removeHeartbeat( DatanodeDescriptor nodeDescr ) {
+      totalCapacity -= nodeDescr.getCapacity();
+      totalRemaining -= nodeDescr.getRemaining();
+      heartbeats.remove(nodeDescr);
+    }
+    
+    /**
+     * Physically remove node from datanodeMap.
+     * 
+     * @param nodeID node
+     */
+    void wipeDatanode( DatanodeID nodeID ) {
+      datanodeMap.remove(nodeID.getStorageID());
+      NameNode.stateChangeLog.debug(
+          "BLOCK* NameSystem.wipeDatanode: "
+          + nodeID.getName() + " storage " + nodeID.getStorageID() 
+          + " is removed from datanodeMap.");
+    }
+    
+    private FSEditLog getEditLog() {
+      return dir.fsImage.getEditLog();
     }
 
     /**
@@ -1541,18 +1586,22 @@
     /**
      */
     public void DFSNodesStatus(Vector live, Vector dead) {
-        synchronized (heartbeats) {
-            synchronized (datanodeMap) {
-                live.addAll(datanodeMap.values());
-                dead.addAll(deaddatanodeMap.values());
-            }
+      synchronized (heartbeats) {
+        synchronized (datanodeMap) {
+          for(Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) {
+            DatanodeDescriptor node = (DatanodeDescriptor)it.next();
+            if( node.isDead() )
+              dead.add( node );
+            else
+              live.add( node );
+          }
         }
+      }
     }
     /** 
      */
-    public DatanodeDescriptor getDataNodeInfo(String name) {
-        UTF8 src = new UTF8(name);
-        return (DatanodeDescriptor)datanodeMap.get(src);
+    public DatanodeInfo getDataNodeInfo(String name) {
+        return (DatanodeDescriptor)datanodeMap.get(name);
     }
     /** 
      */
@@ -1715,6 +1764,9 @@
     /**
      * Get a certain number of targets, if possible.
      * If not, return as many as we can.
+     * Only live nodes contained in {@link #heartbeats} are 
+     * targeted for replication.
+     * 
      * @param desiredReplicates
      *          number of duplicates wanted.
      * @param forbiddenNodes
@@ -1723,11 +1775,11 @@
      */
     DatanodeDescriptor[] chooseTargets(int desiredReplicates, TreeSet forbiddenNodes,
                                  UTF8 clientMachine, long blockSize) {
-        if (desiredReplicates > datanodeMap.size()) {
+        if (desiredReplicates > heartbeats.size()) {
           LOG.warn("Replication requested of "+desiredReplicates
-                      +" is larger than cluster size ("+datanodeMap.size()
+                      +" is larger than cluster size ("+heartbeats.size()
                       +"). Using cluster size.");
-          desiredReplicates  = datanodeMap.size();
+          desiredReplicates  = heartbeats.size();
         }
 
         TreeSet alreadyChosen = new TreeSet();
@@ -1761,7 +1813,7 @@
         //
         // Check if there are any available targets at all
         //
-        int totalMachines = datanodeMap.size();
+        int totalMachines = heartbeats.size();
         if (totalMachines == 0) {
             LOG.warn("While choosing target, totalMachines is " + totalMachines);
             return null;
@@ -1789,7 +1841,7 @@
         // Build list of machines we can actually choose from
         //
         Vector targetList = new Vector();
-        for (Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) {
+        for (Iterator it = heartbeats.iterator(); it.hasNext(); ) {
             DatanodeDescriptor node = (DatanodeDescriptor) it.next();
             if (! forbiddenMachines.contains(node.getHost())) {
                 targetList.add(node);
@@ -1941,7 +1993,7 @@
       }
       return null;
     }
-    /** Stop at and return the detanode at index (used for content browsing)*/
+    /** Stop at and return the datanode at index (used for content browsing)*/
     private DatanodeInfo getDatanodeByIndex( int index ) {
       int i = 0;
       for (Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) {



Mime
View raw message