hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r510181 [1/3] - in /lucene/hadoop/trunk: ./ conf/ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/util/ src/test/org/apache/hadoop/dfs/
Date Wed, 21 Feb 2007 20:11:01 GMT
Author: cutting
Date: Wed Feb 21 12:11:00 2007
New Revision: 510181

URL: http://svn.apache.org/viewvc?view=rev&rev=510181
Log:
HADOOP-442.  Permit one to specify hosts allowed to connect to namenode and jobtracker with
include and exclude files.  Contributed by Wendy.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DisallowedDatanodeException.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java.orig
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/DisallowedTaskTrackerException.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/util/HostsFileReader.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/conf/hadoop-default.xml
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=510181&r1=510180&r2=510181
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Feb 21 12:11:00 2007
@@ -74,6 +74,10 @@
 22. HADOOP-947.  Improve performance of datanode decomissioning.
     (Dhruba Borthakur via cutting)
 
+23. HADOOP-442.  Permit one to specify hosts allowed to connect to
+    namenode and jobtracker with include and exclude files.  (Wendy
+    Chien via cutting)
+
 
 Release 0.11.2 - 2007-02-16
 

Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?view=diff&rev=510181&r1=510180&r2=510181
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Wed Feb 21 12:11:00 2007
@@ -356,6 +356,24 @@
 </property>
 
 <property>
+  <name>dfs.hosts</name>
+  <value></value>
+  <description>Names a file that contains a list of hosts that are
+  permitted to connect to the namenode. The full pathname of the file
+  must be specified.  If the value is empty, all hosts are
+  permitted.</description>
+</property>
+
+<property>
+  <name>dfs.hosts.exclude</name>
+  <value></value>
+  <description>Names a file that contains a list of hosts that are
+  not permitted to connect to the namenode.  The full pathname of the
+  file must be specified.  If the value is empty, no hosts are
+  excluded.</description>
+</property> 
+
+<property>
   <name>fs.s3.block.size</name>
   <value>1048576</value>
   <description>
@@ -698,6 +716,22 @@
   				retained.
   </description>
 </property>
+
+<property>
+  <name>mapred.hosts</name>
+  <value></value>
+  <description>Names a file that contains the list of nodes that may
+  connect to the jobtracker.  If the value is empty, all hosts are
+  permitted.</description>
+</property>
+
+<property>
+  <name>mapred.hosts.exclude</name>
+  <value></value>
+  <description>Names a file that contains the list of hosts that
+  should be excluded by the jobtracker.  If the value is empty, no
+  hosts are excluded.</description>
+</property> 
 
 <property>
   <name>jobclient.output.filter</name>

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java?view=diff&rev=510181&r1=510180&r2=510181
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java Wed Feb 21 12:11:00
2007
@@ -29,7 +29,7 @@
  **********************************************************************/
 interface ClientProtocol extends VersionedProtocol {
 
-  public static final long versionID = 7L;  // periodic checkpoint added
+    public static final long versionID = 8L; // refreshNodes added
   
     ///////////////////////////////////////
     // File contents
@@ -313,7 +313,13 @@
      */
     public boolean setSafeMode( FSConstants.SafeModeAction action ) throws IOException;
 
-    public boolean decommission( FSConstants.DecommissionAction action, String[] nodenames)
throws IOException;
+    /**
+     * Tells the namenode to reread the hosts and exclude files. 
+     * @return True if the call was successful, false otherwise.
+     * @throws IOException
+     */
+    public void refreshNodes() throws IOException;
+
 
     /**
      * Get the size of the current edit log (in bytes).

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java?view=diff&rev=510181&r1=510180&r2=510181
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java Wed Feb 21 12:11:00 2007
@@ -132,61 +132,23 @@
     }
 
     /**
-     * Command related to decommission of a datanode.
-     * Usage: java DFSAdmin -decommission [enter | leave | get]
-     * @param argv List of command line parameters. Each of these items
-              could be a hostname or a hostname:portname.
-     * @param idx The index of the command that is being processed.
-     * @exception IOException if the filesystem does not exist.
-     * @return 0 on success, non zero on error.
+     * Command to ask the namenode to reread the hosts and excluded hosts 
+     * file.
+     * Usage: java DFSAdmin -refreshNodes
+     * @exception IOException 
      */
-    public int decommission(String[] argv, int idx) throws IOException {
+    public int refreshNodes() throws IOException {
       int exitCode = -1;
 
       if (!(fs instanceof DistributedFileSystem)) {
         System.out.println("FileSystem is " + fs.getName());
         return exitCode;
       }
-      if (idx >= argv.length - 1) {
-        printUsage("-decommission");
-        return exitCode;
-      }
-      
-      //
-      // Copy all the datanode names to nodes[]
-      //
-      String[] nodes = new String[argv.length - idx - 1];
-      for (int i = idx + 1, j = 0; i < argv.length; i++, j++) {
-        nodes[j] = argv[i];
-      }
 
-      FSConstants.DecommissionAction action;
-
-      if ("set".equalsIgnoreCase(argv[idx])) {
-        action = FSConstants.DecommissionAction.DECOMMISSION_SET;
-      } else if ("clear".equalsIgnoreCase(argv[idx])) {
-        action = FSConstants.DecommissionAction.DECOMMISSION_CLEAR;
-      } else if ("get".equalsIgnoreCase(argv[idx])) {
-        action = FSConstants.DecommissionAction.DECOMMISSION_GET;
-      } else {
-        printUsage("-decommission");
-        return exitCode;
-      }
       DistributedFileSystem dfs = (DistributedFileSystem) fs;
-      boolean mode = dfs.decommission(action, nodes);
-
-      if (action == FSConstants.DecommissionAction.DECOMMISSION_GET) {
-        if (mode) {
-          System.out.println("Node(s) has finished decommission");
-        }
-        else {
-          System.out.println("Node(s) have not yet been decommissioned");
-        }
-        return 0;
-      }
-      if (mode) {
-        return 0; // success
-      }
+      dfs.refreshNodes();
+      exitCode = 0;
+   
       return exitCode;
     }
 
@@ -197,19 +159,18 @@
     public void printUsage(String cmd) {
           if ("-report".equals(cmd)) {
             System.err.println("Usage: java DFSAdmin"
-                + " [report]");
+                + " [-report]");
           } else if ("-safemode".equals(cmd)) {
             System.err.println("Usage: java DFSAdmin"
                 + " [-safemode enter | leave | get | wait]");
-          } else if ("-decommission".equals(cmd)) {
+          } else if ("-refreshNodes".equals(cmd)) {
             System.err.println("Usage: java DFSAdmin"
-                + " [-decommission set | clear | get "
-                + "[datanode1[, datanode2..]]");
+                + " [-refreshNodes]");
           } else {
             System.err.println("Usage: java DFSAdmin");
             System.err.println("           [-report]");
             System.err.println("           [-safemode enter | leave | get | wait]");
-            System.err.println("           [-decommission set | clear | get]");
+            System.err.println("           [-refreshNodes]");
           }
     }
 
@@ -242,13 +203,14 @@
                   printUsage(cmd);
                   return exitCode;
                 }
-        } else if ("-decommission".equals(cmd)) {
-                if (argv.length < 2) {
+        } else if ("-refreshNodes".equals(cmd)) {
+                if (argv.length != 1) {
                   printUsage(cmd);
                   return exitCode;
                 }
         }
 
+
         // initialize DFSAdmin
         try {
             init();
@@ -267,8 +229,8 @@
                 report();
             } else if ("-safemode".equals(cmd)) {
                 setSafeMode(argv, i);
-            } else if ("-decommission".equals(cmd)) {
-                exitCode = decommission(argv, i);
+            } else if ("-refreshNodes".equals(cmd)) {
+                exitCode = refreshNodes();
             } else {
                 exitCode = -1;
                 System.err.println(cmd.substring(1) + ": Unknown command");

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=510181&r1=510180&r2=510181
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Wed Feb 21 12:11:00
2007
@@ -370,17 +370,16 @@
     }
 
     /**
-     * Set, clear decommission state of datnode(s).
-     * See {@link ClientProtocol#decommission(FSConstants.DecommissionAction)} 
+     * Refresh the hosts and exclude files.  (Rereads them.)
+     * See {@link ClientProtocol#refreshNodes()} 
      * for more details.
      * 
-     * @see ClientProtocol#decommission(FSConstants.DecommissionAction)
+     * @see ClientProtocol#refreshNodes()
      */
-    public boolean decommission(DecommissionAction action, String[] nodes)
-                                throws IOException {
-      return namenode.decommission(action, nodes);
+    public void refreshNodes() throws IOException {
+      namenode.refreshNodes();
     }
-
+    
     /**
      */
     public boolean mkdirs(UTF8 src) throws IOException {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?view=diff&rev=510181&r1=510180&r2=510181
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Wed Feb 21 12:11:00 2007
@@ -499,7 +499,8 @@
           return;
         } catch( RemoteException re ) {
           String reClass = re.getClassName();
-          if( UnregisteredDatanodeException.class.getName().equals( reClass )) {
+          if( UnregisteredDatanodeException.class.getName().equals( reClass ) ||
+              DisallowedDatanodeException.class.getName().equals( reClass )) {
             LOG.warn( "DataNode is shutting down: " + 
                       StringUtils.stringifyException(re));
             shutdown();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java?view=diff&rev=510181&r1=510180&r2=510181
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java Wed Feb 21 12:11:00
2007
@@ -190,7 +190,6 @@
    * Sets the admin state to indicate that decommision is complete.
    */
    void setDecommissioned() {
-     assert isDecommissionInProgress();
      adminState = AdminStates.DECOMMISSIONED;
    }
 

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DisallowedDatanodeException.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DisallowedDatanodeException.java?view=auto&rev=510181
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DisallowedDatanodeException.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DisallowedDatanodeException.java Wed
Feb 21 12:11:00 2007
@@ -0,0 +1,18 @@
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+
+
+/**
+ * This exception is thrown when a datanode tries to register or communicate
+ * with the namenode when it does not appear on the list of included nodes, 
+ * or has been specifically excluded.
+ * 
+ * @author Wendy Chien
+ */
+class DisallowedDatanodeException extends IOException {
+
+  public DisallowedDatanodeException( DatanodeID nodeID ) {
+    super("Datanode denied communication with namenode: " + nodeID.getName() );
+  }
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java?view=diff&rev=510181&r1=510180&r2=510181
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java Wed Feb
21 12:11:00 2007
@@ -343,12 +343,11 @@
       return dfs.setSafeMode( action );
     }
 
-    /**
-     * Set, clear decommission of a set of datanodes.
+    /*
+     * Refreshes the list of hosts and excluded hosts from the configured 
+     * files.  
      */
-    public boolean decommission(FSConstants.DecommissionAction action,
-                                String[] nodes)
-    throws IOException {
-      return dfs.decommission(action, nodes);
+    public void refreshNodes() throws IOException {
+      dfs.refreshNodes();
     }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?view=diff&rev=510181&r1=510180&r2=510181
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Wed Feb 21 12:11:00
2007
@@ -122,9 +122,6 @@
     // SafeMode actions
     public enum SafeModeAction{ SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET; }
 
-    // decommission administrative actions
-    public enum DecommissionAction{ DECOMMISSION_SET, DECOMMISSION_CLEAR, DECOMMISSION_GET;
}
-
     // Version is reflected in the dfs image and edit log files.
     // Version is reflected in the data storage file.
     // Versions are negative.

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=510181&r1=510180&r2=510181
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Wed Feb 21 12:11:00
2007
@@ -204,6 +204,9 @@
     // for block replicas placement
     Replicator replicator = new Replicator();
 
+    private HostsFileReader hostsReader; 
+    private Daemon dnthread = null;
+
     /**
      * dirs is a list oif directories where the filesystem directory state 
      * is stored
@@ -252,6 +255,11 @@
         replthread.start();
         this.systemStart = now();
         this.startTime = new Date(systemStart); 
+        
+        this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""),
+                                               conf.get("dfs.hosts.exclude",""));
+        this.dnthread = new Daemon(new DecommissionedMonitor());
+        dnthread.start();
 
         this.infoPort = conf.getInt("dfs.info.port", 50070);
         this.infoBindAddress = conf.get("dfs.info.bindAddress", "0.0.0.0");
@@ -292,6 +300,7 @@
             infoServer.stop();
             hbthread.join(3000);
             replthread.join(3000);
+            dnthread.join(3000);
         } catch (InterruptedException ie) {
         } finally {
           // using finally to ensure we also wait for lease daemon
@@ -1575,6 +1584,11 @@
           (System.currentTimeMillis() - heartbeatExpireInterval));
     }
     
+    void setDatanodeDead(DatanodeID nodeID) throws IOException {
+        DatanodeDescriptor node = getDatanode(nodeID);
+        node.setLastUpdate(0);
+    }
+
     /**
      * The given node has reported in.  This method should:
      * 1) Record the heartbeat, so the datanode isn't timed out
@@ -1606,6 +1620,12 @@
               return true;
           }
           
+          // Check if this datanode should actually be shutdown instead. 
+          if (shouldNodeShutdown(nodeinfo)) {
+              setDatanodeDead(nodeinfo);
+              throw new DisallowedDatanodeException(nodeinfo);
+          }
+
           if( !nodeinfo.isAlive ) {
               return true;
           } else {
@@ -1916,6 +1936,12 @@
           +"from "+nodeID.getName()+" "+newReport.length+" blocks" );
         DatanodeDescriptor node = getDatanode( nodeID );
 
+        // Check if this datanode should actually be shutdown instead.
+        if (shouldNodeShutdown(node)) {
+          setDatanodeDead(node);
+          throw new DisallowedDatanodeException(node);
+        }
+
         //
         // Modify the (block-->datanode) map, according to the difference
         // between the old and new block report.
@@ -2198,8 +2224,16 @@
                 "Unexpected exception.  Got blockReceived message from node " 
                 + block.getBlockName() + ", but there is no info for it");
         }
+
         NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: "
                 +block.getBlockName()+" is received from " + nodeID.getName() );
+
+        // Check if this datanode should actually be shutdown instead.
+        if (shouldNodeShutdown(node)) {
+            setDatanodeDead(node);
+            throw new DisallowedDatanodeException(node);
+        }
+
         //
         // Modify the blocks->datanode map and node's map.
         // 
@@ -2260,100 +2294,33 @@
       }
     }
 
+
     /**
-     * Start decommissioning the specified datanodes. If a datanode is
-     * already being decommissioned, then this is a no-op.
+     * Start decommissioning the specified datanode. 
      */
-    public synchronized void startDecommission (String[] nodes) 
-                             throws IOException {
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot decommission node ", safeMode);
-      }
-      boolean isError = false;
-      String badnodes = "";
-
-      synchronized (datanodeMap) {
-        for (int i = 0; i < nodes.length; i++) {
-          boolean found = false;
-          for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
-               it.hasNext(); ) {
-            DatanodeDescriptor node = it.next();
+    private void startDecommission (DatanodeDescriptor node) 
+        throws IOException {
 
-            //
-            // If this is a node that we are interested in, set its admin state.
-            //
-            if (node.getName().equals(nodes[i]) || 
-                node.getHost().equals(nodes[i])) {
-              found = true;
-              if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
-                LOG.info("Start Decommissioning node " + node.name);
-                node.startDecommission();
-                //
-                // all those blocks that resides on this node has to be 
-                // replicated.
-                Block decommissionBlocks[] = node.getBlocks();
-                for (int j = 0; j < decommissionBlocks.length; j++) {
-                    neededReplications.update(decommissionBlocks[j], -1, 0);
-                }
-              }
-              break;
-            }
-          }
-          //
-          // Record the fact that a specified node was not found
-          //
-          if (!found) {
-            badnodes += nodes[i] + " ";
-            isError = true;
-          }
+      if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
+        LOG.info("Start Decommissioning node " + node.name);
+        node.startDecommission();
+        //
+        // all the blocks that reside on this node have to be 
+        // replicated.
+        Block decommissionBlocks[] = node.getBlocks();
+        for (int j = 0; j < decommissionBlocks.length; j++) {
+            neededReplications.update(decommissionBlocks[j], -1, 0);
         }
       }
-      if (isError) {
-        throw new IOException("Nodes " + badnodes + " not found");
-      }
     }
 
     /**
      * Stop decommissioning the specified datanodes.
      */
-    public synchronized void stopDecommission (String[] nodes) 
+    public void stopDecommission (DatanodeDescriptor node) 
                              throws IOException {
-      if (isInSafeMode()) {
-        throw new SafeModeException("Cannot decommission node ", safeMode);
-      }
-      boolean isError = false;
-      String badnodes = "";
-
-      synchronized (datanodeMap) {
-        for (int i = 0; i < nodes.length; i++) {
-          boolean found = false;
-          for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
-               it.hasNext(); ) {
-            DatanodeDescriptor node = it.next();
-
-            //
-            // If this is a node that we are interested in, set its admin state.
-            //
-            if (node.getName().equals(nodes[i]) || 
-                node.getHost().equals(nodes[i])) {
-              LOG.info("Stop Decommissioning node " + node.name);
-              found = true;
-              node.stopDecommission();
-              break;
-            }
-          }
-          //
-          // Record the fact that a specified node was not found
-          //
-          if (!found) {
-            badnodes += nodes[i] + " ";
-            isError = true;
-          }
-        }
-      }
-      if (isError) {
-        throw new IOException("Nodes " + badnodes + " not found");
-      }
+      LOG.info("Stop Decommissioning node " + node.name);
+      node.stopDecommission();
     }
 
     /**
@@ -2620,7 +2587,7 @@
                 && (excessBlocks == null || ! excessBlocks.contains(block))) {
               // filter out containingNodes that are marked for decommission.
               List<DatanodeDescriptor> nodes = 
-                filterDecommissionedNodes(containingNodes);
+                  filterDecommissionedNodes(containingNodes);
               int numCurrentReplica = nodes.size();
               DatanodeDescriptor targets[] = replicator.chooseTarget(
                   Math.min( fileINode.getReplication() - numCurrentReplica,
@@ -3157,6 +3124,124 @@
       
     } //end of Replicator
 
+
+    // Keeps track of which datanodes are allowed to connect to the namenode.
+        
+    private boolean inHostsList(DatanodeID node) {
+      Set<String> hostsList = hostsReader.getHosts();
+      return (hostsList.isEmpty() || 
+              hostsList.contains(node.getName()) || 
+              hostsList.contains(node.getHost()));
+    }
+
+
+    private boolean inExcludedHostsList(DatanodeID node) {
+      Set<String> excludeList = hostsReader.getExcludedHosts();
+      return (excludeList.contains(node.getName()) ||
+              excludeList.contains(node.getHost()));
+    }
+
+    /**
+     * Rereads the files to update the hosts and exclude lists.  It
+     * checks if any of the hosts have changed states:
+     * 1. Added to hosts  --> no further work needed here.
+     * 2. Removed from hosts --> mark AdminState as decommissioned. 
+     * 3. Added to exclude --> start decommission.
+     * 4. Removed from exclude --> stop decommission.
+     */
+    void refreshNodes() throws IOException {
+      hostsReader.refresh();
+      synchronized (this) {
+        for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
+             it.hasNext(); ) {
+          DatanodeDescriptor node = it.next();
+          // Check if not include.
+          if (!inHostsList(node)) {
+            node.setDecommissioned();  // case 2.
+          } else {
+            if (inExcludedHostsList(node)) {
+              if (!node.isDecommissionInProgress() && 
+                  !node.isDecommissioned()) {
+                startDecommission(node);   // case 3.
+              }
+            } else {
+              if (node.isDecommissionInProgress() || 
+                  node.isDecommissioned()) {
+                stopDecommission(node);   // case 4.
+              } 
+            }
+          }
+        }
+      } 
+      
+    }
+    
+
+    /**
+     * Checks if the node is not on the hosts list.  If it is not, then
+     * it will be ignored.  If the node is in the hosts list, but is also 
+     * on the exclude list, then it will be decommissioned.
+     * Returns FALSE if node is rejected for registration. 
+     * Returns TRUE if node is registered (including when it is on the 
+     * exclude list and is being decommissioned). 
+     */
+    public synchronized boolean verifyNodeRegistration(DatanodeRegistration nodeReg) 
+      throws IOException {
+      if (!inHostsList(nodeReg)) {
+        return false;    
+      }
+      if (inExcludedHostsList(nodeReg)) {
+        DatanodeDescriptor node = getDatanode(nodeReg);
+        if (!checkDecommissionStateInternal(node)) {
+          startDecommission(node);
+        }
+      } 
+      return true;
+    }
+    
+    /**
+     * Checks if the Admin state bit is DECOMMISSIONED.  If so, then 
+     * we should shut it down. 
+     * 
+     * Returns true if the node should be shutdown.
+     */
+    private boolean shouldNodeShutdown(DatanodeDescriptor node) {
+      return (node.isDecommissioned());
+    }
+
+    /**
+     * Check if any of the nodes being decommissioned has finished 
+     * moving all its datablocks to another replica. This is a loose
+     * heuristic to determine when a decommission is really over.
+     */
+    public synchronized void decommissionedDatanodeCheck() {
+      for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
+           it.hasNext(); ) {
+        DatanodeDescriptor node = it.next();  
+        checkDecommissionStateInternal(node);
+      }
+    }
+    
+    /**
+     * Periodically calls decommissionedDatanodeCheck().
+     */
+    class DecommissionedMonitor implements Runnable {
+        
+      public void run() {
+        while (fsRunning) {
+          try {
+            decommissionedDatanodeCheck();
+          } catch (Exception e) {
+            FSNamesystem.LOG.info(StringUtils.stringifyException(e));
+          }
+          try {
+            Thread.sleep(1000 * 60 * 5);
+          } catch (InterruptedException ie) {
+          }
+        }
+      }
+    }
+    
 
     /**
      * Information about the file while it is being written to.



Mime
View raw message