hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r510181 [3/3] - in /lucene/hadoop/trunk: ./ conf/ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/util/ src/test/org/apache/hadoop/dfs/
Date Wed, 21 Feb 2007 20:11:01 GMT
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?view=diff&rev=510181&r1=510180&r2=510181
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Wed Feb 21 12:11:00 2007
@@ -515,24 +515,12 @@
       return namesystem.isInSafeMode();
     }
 
-    /**
-     * Set administrative commands to decommission datanodes.
+    /*
+     * Refresh the list of datanodes that the namenode should allow to  
+     * connect.  Uses the files list in the configuration to update the list. 
      */
-    public boolean decommission(DecommissionAction action, String[] nodes)
-                                throws IOException {
-      boolean ret = true;
-      switch (action) {
-        case DECOMMISSION_SET: // decommission datanode(s)
-          namesystem.startDecommission(nodes);
-          break;
-        case DECOMMISSION_CLEAR: // remove decommission state of a datanode
-          namesystem.stopDecommission(nodes);
-          break;
-        case DECOMMISSION_GET: // are all the node decommissioned?
-          ret = namesystem.checkDecommissioned(nodes);
-          break;
-        }
-        return ret;
+    public void refreshNodes() throws IOException {
+      namesystem.refreshNodes();
     }
 
     /**
@@ -564,8 +552,12 @@
     public DatanodeRegistration register( DatanodeRegistration nodeReg,
                                           String networkLocation
                                         ) throws IOException {
+      if (!namesystem.verifyNodeRegistration(nodeReg)) {
+        throw new DisallowedDatanodeException( nodeReg );
+      }
       verifyVersion( nodeReg.getVersion() );
       namesystem.registerDatanode( nodeReg, networkLocation );
+      
       return nodeReg;
     }
     
@@ -650,7 +642,8 @@
     /** 
      * Verify request.
      * 
-     * Verifies correctness of the datanode version and registration ID.
+     * Verifies correctness of the datanode version, registration ID, and 
+     * if the datanode does not need to be shutdown.
      * 
      * @param nodeReg data node registration
      * @throws IOException

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/DisallowedTaskTrackerException.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/DisallowedTaskTrackerException.java?view=auto&rev=510181
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/DisallowedTaskTrackerException.java
(added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/DisallowedTaskTrackerException.java
Wed Feb 21 12:11:00 2007
@@ -0,0 +1,18 @@
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+
+/**
+ * This exception is thrown when a tasktracker tries to register or communicate
+ * with the jobtracker when it does not appear on the list of included nodes, 
+ * or has been specifically excluded.
+ * 
+ * @author Wendy Chien
+ */
+class DisallowedTaskTrackerException extends IOException {
+
+  public DisallowedTaskTrackerException(TaskTrackerStatus tracker) {
+    super("Tasktracker denied communication with jobtracker: " + tracker.getTrackerName());
+  }
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=510181&r1=510180&r2=510181
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Wed Feb 21 12:11:00
2007
@@ -23,7 +23,7 @@
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.conf.*;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.*;
 
 import java.io.*;
 import java.net.*;
@@ -449,6 +449,7 @@
     Random r = new Random();
 
     private int maxCurrentTasks;
+    private HostsFileReader hostsReader;
 
     //
     // Properties to maintain while running Jobs and Tasks:
@@ -572,6 +573,10 @@
         // Same with 'localDir' except it's always on the local disk.
         jobConf.deleteLocalFiles(SUBDIR);
 
+        // Read the hosts/exclude files to restrict access to the jobtracker.
+        this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
+                                               conf.get("mapred.hosts.exclude", ""));
+                                           
         // Set ports, start RPC servers, etc.
         InetSocketAddress addr = getAddress(conf);
         this.localMachine = addr.getHostName();
@@ -962,7 +967,12 @@
               " (initialContact: " + initialContact + 
               " acceptNewTasks: " + acceptNewTasks + ")" +
               " with responseId: " + responseId);
-      
+
+        // Make sure heartbeat is from a tasktracker allowed by the jobtracker.
+        if (!acceptTaskTracker(status)) {
+          throw new DisallowedTaskTrackerException(status);
+        }
+
         // First check if the last heartbeat response got through 
         String trackerName = status.getTrackerName();
         HeartbeatResponse prevHeartbeatResponse =
@@ -1033,6 +1043,32 @@
         removeMarkedTasks(trackerName);
         
         return response;
+    }
+    
+    /**
+     * Return if the specified tasktracker is in the hosts list, 
+     * if one was configured.  If none was configured, then this 
+     * returns true.
+     */
+    private boolean inHostsList(TaskTrackerStatus status) {
+      Set<String> hostsList = hostsReader.getHosts();
+      return (hostsList.isEmpty() || hostsList.contains(status.getHost()));
+    }
+
+    /**
+     * Return if the specified tasktracker is in the exclude list.
+     */
+    private boolean inExcludedHostsList(TaskTrackerStatus status) {
+      Set<String> excludeList = hostsReader.getExcludedHosts();
+      return excludeList.contains(status.getHost());
+    }
+
+    /**
+     * Returns true if the tasktracker is in the hosts list and 
+     * not in the exclude list. 
+     */
+    private boolean acceptTaskTracker(TaskTrackerStatus status) {
+      return (inHostsList(status) && !inExcludedHostsList(status));
     }
     
     /**

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=510181&r1=510180&r2=510181
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Feb 21 12:11:00
2007
@@ -55,7 +55,7 @@
     static final long WAIT_FOR_DONE = 3 * 1000;
     private int httpPort;
 
-    static enum State {NORMAL, STALE, INTERRUPTED}
+    static enum State {NORMAL, STALE, INTERRUPTED, DENIED}
 
     public static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.mapred.TaskTracker");
@@ -529,6 +529,12 @@
             jobClient.reportTaskTrackerError(taskTrackerName, 
                     "DiskErrorException", msg);
             return State.STALE;
+          } catch (RemoteException re) {
+            String reClass = re.getClassName();
+            if (DisallowedTaskTrackerException.class.getName().equals(reClass)) {
+              LOG.info("Tasktracker disallowed by JobTracker.");
+              return State.DENIED;
+            }
           } catch (Exception except) {
             String msg = "Caught exception: " + 
                          StringUtils.stringifyException(except);
@@ -855,14 +861,18 @@
      */
     public void run() {
         try {
-            while (running && !shuttingDown) {
+            boolean denied = false;
+            while (running && !shuttingDown && !denied) {
                 boolean staleState = false;
                 try {
                     // This while-loop attempts reconnects if we get network errors
-                    while (running && ! staleState && !shuttingDown ) {
+                    while (running && ! staleState && !shuttingDown &&
!denied) {
                         try {
-                            if (offerService() == State.STALE) {
+                            State osState = offerService();
+                            if (osState == State.STALE) {
                                 staleState = true;
+                            } else if (osState == State.DENIED) {
+                                denied = true;
                             }
                         } catch (Exception ex) {
                             if (!shuttingDown) {
@@ -881,6 +891,9 @@
                 if (shuttingDown) { return; }
                 LOG.warn("Reinitializing local state");
                 initialize();
+            }
+            if (denied) {
+                shutdown();
             }
         } catch (IOException iex) {
             LOG.error("Got fatal exception while reinitializing TaskTracker: " +

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/HostsFileReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/HostsFileReader.java?view=auto&rev=510181
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/HostsFileReader.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/HostsFileReader.java Wed Feb 21 12:11:00
2007
@@ -0,0 +1,62 @@
+package org.apache.hadoop.util;
+
+import java.io.*;
+import java.util.Set;
+import java.util.HashSet;
+
+
+// Keeps track of which datanodes are allowed to connect to the namenode.
+public class HostsFileReader {
+  private Set<String> includes;
+  private Set<String> excludes;
+  private String includesFile;
+  private String excludesFile;
+
+  public HostsFileReader(String inFile, 
+                         String exFile) throws IOException {
+    includes = new HashSet<String>();
+    excludes = new HashSet<String>();
+    includesFile = inFile;
+    excludesFile = exFile;
+    refresh();
+  }
+
+  private void readFileToSet(String filename, Set<String> set) throws IOException {
+    FileInputStream fis = new FileInputStream(new File(filename));
+    try {
+      BufferedReader reader = new BufferedReader(new InputStreamReader(fis));
+      String line;
+      while ((line = reader.readLine()) != null) {
+        String[] nodes = line.split("[ \t\n\f\r]+");
+        if (nodes != null) {
+          for (int i = 0; i < nodes.length; i++) {
+            set.add(nodes[i]);  // might need to add canonical name
+          }
+        }
+      }   
+    } finally {
+      fis.close();
+    }  
+  }
+
+  public void refresh() throws IOException {
+    includes.clear();
+    excludes.clear();
+    
+    if (!includesFile.equals("")) {
+        readFileToSet(includesFile, includes);
+    }
+    if (!excludesFile.equals("")) {
+      readFileToSet(excludesFile, excludes);
+    }
+  }
+
+  public Set<String> getHosts() {
+    return includes;
+  }
+
+  public Set<String> getExcludedHosts() {
+    return excludes;
+  }
+
+}

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java?view=diff&rev=510181&r1=510180&r2=510181
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java Wed Feb 21 12:11:00
2007
@@ -24,6 +24,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FSOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -35,9 +36,25 @@
   static final long seed = 0xDEADBEEFL;
   static final int blockSize = 8192;
   static final int fileSize = 16384;
-  static final int numDatanodes = 4;
+  static final int numDatanodes = 5;
 
   Random myrand = new Random();
+  Path hostsFile;
+  Path excludeFile;
+
+  private enum NodeState {NORMAL, DECOMMISSION_INPROGRESS, DECOMMISSIONED; }
+
+  private void writeConfigFile(FileSystem fs, Path name, String node) 
+      throws IOException {
+    // delete if it already exists
+    if (fs.exists(name)) {
+      fs.delete(name);
+    }
+    FSDataOutputStream stm = fs.create(name);
+    stm.writeBytes(node);
+    stm.writeBytes("\n");
+    stm.close();
+  }
 
   private void writeFile(FileSystem fileSys, Path name, int repl)
   throws IOException {
@@ -66,7 +83,11 @@
    * replication factor is 1 more than the specified one.
    */
   private void checkFile(FileSystem fileSys, Path name, int repl,
-                         String[] downnodes) throws IOException {
+                         String downnode) throws IOException {
+    //
+    // sleep an additional 10 seconds for the blockreports from the datanodes
+    // to arrive. 
+    //
     FSInputStream is = fileSys.openRaw(name);
     DFSClient.DFSInputStream dis = (DFSClient.DFSInputStream) is;
     DatanodeInfo[][] dinfo = dis.getDataNodes();
@@ -75,12 +96,10 @@
       int hasdown = 0;
       DatanodeInfo[] nodes = dinfo[blk];
       for (int j = 0; j < nodes.length; j++) {     // for each replica
-        for (int k = 0; downnodes != null && k < downnodes.length; k++) {
-          if (nodes[j].getName().equals(downnodes[k])) {
-            hasdown++;
-            System.out.println("Block " + blk + " replica " +
-                               nodes[j].getName() + " is decommissioned.");
-          }
+        if (nodes[j].getName().equals(downnode)) {
+          hasdown++;
+          System.out.println("Block " + blk + " replica " +
+                             nodes[j].getName() + " is decommissioned.");
         }
       }
       System.out.println("Block " + blk + " has " + hasdown +
@@ -107,74 +126,92 @@
   /*
    * decommission one random node.
    */
-  private String[] decommissionNode(DFSClient client, FileSystem filesys)
-                                    throws IOException {
+  private String decommissionNode(DFSClient client, 
+                                  FileSystem filesys,
+                                  FileSystem localFileSys)
+      throws IOException {
     DistributedFileSystem dfs = (DistributedFileSystem) filesys;
     DatanodeInfo[] info = client.datanodeReport();
 
     //
     // pick one datanode randomly.
     //
-    int index = myrand.nextInt(info.length);
+    int index = 0;
+    boolean found = false;
+    while (!found) {
+      index = myrand.nextInt(info.length);
+      if (!info[index].isDecommissioned()) {
+        found = true;
+      }
+    }
     String nodename = info[index].getName();
     System.out.println("Decommissioning node: " + nodename);
-    String[] nodes = new String[1];
-    nodes[0] = nodename;
-    dfs.decommission(FSConstants.DecommissionAction.DECOMMISSION_SET, nodes);
-    return nodes;
+
+    // write nodename into the exclude file. 
+    writeConfigFile(localFileSys, excludeFile, nodename);
+    dfs.refreshNodes();
+    return nodename;
   }
 
   /*
    * put node back in action
    */
-  private void commissionNode(DFSClient client, FileSystem filesys,
-                              String[] nodes) throws IOException {
+  private void commissionNode(FileSystem filesys, FileSystem localFileSys,
+                              String node) throws IOException {
     DistributedFileSystem dfs = (DistributedFileSystem) filesys;
-    DatanodeInfo[] info = client.datanodeReport();
 
-    for (int i = 0; i < nodes.length; i++) {
-      System.out.println("Putting node back in action: " + nodes[i]);
-    }
-    dfs.decommission(FSConstants.DecommissionAction.DECOMMISSION_CLEAR, nodes);
+    System.out.println("Commissioning nodes.");
+    writeConfigFile(localFileSys, excludeFile, "");
+    dfs.refreshNodes();
   }
 
-  /* 
-   * Check that node(s) were decommissioned
+  /*
+   * Check if node is in the requested state.
    */
-  private void checkNodeDecommission(DFSClient client, FileSystem filesys,
-                                     String[] nodes) throws IOException {
+  private boolean checkNodeState(FileSystem filesys, 
+                                 String node, 
+                                 NodeState state) throws IOException {
     DistributedFileSystem dfs = (DistributedFileSystem) filesys;
-    boolean ret = dfs.decommission(
-                    FSConstants.DecommissionAction.DECOMMISSION_GET, nodes);
-    assertEquals("State of Decommissioned Datanode(s) ", ret, true);
+    boolean done = false;
+    boolean foundNode = false;
+    DatanodeInfo[] datanodes = dfs.getDataNodeStats();
+    for (int i = 0; i < datanodes.length; i++) {
+      DatanodeInfo dn = datanodes[i];
+      if (dn.getName().equals(node)) {
+        if (state == NodeState.DECOMMISSIONED) {
+          done = dn.isDecommissioned();
+        } else if (state == NodeState.DECOMMISSION_INPROGRESS) {
+          done = dn.isDecommissionInProgress();
+        } else {
+          done = (!dn.isDecommissionInProgress() && !dn.isDecommissioned());
+        }
+        System.out.println(dn.getDatanodeReport());
+        foundNode = true;
+      }
+    }
+    if (!foundNode) {
+      throw new IOException("Could not find node: " + node);
+    }
+    return done;
   }
 
   /* 
    * Wait till node is fully decommissioned.
    */
-  private void waitNodeDecommission(DFSClient client, FileSystem filesys,
-                                     String[] nodes) throws IOException {
+  private void waitNodeState(FileSystem filesys,
+                             String node,
+                             NodeState state) throws IOException {
     DistributedFileSystem dfs = (DistributedFileSystem) filesys;
-    boolean done = dfs.decommission(
-                     FSConstants.DecommissionAction.DECOMMISSION_GET, nodes);
+    boolean done = checkNodeState(filesys, node, state);
     while (!done) {
-      System.out.println("Waiting for nodes " + nodes[0] +
-                         " to be fully decommissioned...");
+      System.out.println("Waiting for node " + node +
+                         " to change state...");
       try {
-        Thread.sleep(5000L);
+        Thread.sleep(1000);
       } catch (InterruptedException e) {
         // nothing
       }
-      done = dfs.decommission(FSConstants.DecommissionAction.DECOMMISSION_GET,
-                              nodes);
-    }
-    //
-    // sleep an additional 10 seconds for the blockreports from the datanodes
-    // to arrive. 
-    //
-    try {
-      Thread.sleep(10 * 1000L);
-    } catch (Exception e) {
+      done = checkNodeState(filesys, node, state);
     }
   }
   
@@ -183,6 +220,17 @@
    */
   public void testDecommission() throws IOException {
     Configuration conf = new Configuration();
+
+    // Set up the hosts/exclude files.
+    FileSystem localFileSys = FileSystem.getLocal(conf);
+    Path workingDir = localFileSys.getWorkingDirectory();
+    Path dir = new Path(workingDir, "build/test/data/work-dir/decommission");
+    assertTrue(localFileSys.mkdirs(dir));
+    hostsFile = new Path(dir, "hosts");
+    excludeFile = new Path(dir, "exclude");
+    conf.set("dfs.hosts.exclude", excludeFile.toString());
+    writeConfigFile(localFileSys, excludeFile, "");
+
     MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, numDatanodes, false);
     // Now wait for 15 seconds to give datanodes chance to register
     // themselves and to report heartbeat
@@ -209,11 +257,16 @@
         Path file1 = new Path("smallblocktest.dat");
         writeFile(fileSys, file1, 3);
         checkFile(fileSys, file1, 3);
-        String downnodes[] = decommissionNode(client, fileSys);
-        waitNodeDecommission(client, fileSys, downnodes);
-        checkFile(fileSys, file1, 3, downnodes);
-        commissionNode(client, fileSys, downnodes);
+
+        String downnode  = decommissionNode(client, fileSys, localFileSys);
+        waitNodeState(fileSys, downnode, NodeState.DECOMMISSION_INPROGRESS);
+        commissionNode(fileSys, localFileSys, downnode);
+        waitNodeState(fileSys, downnode, NodeState.NORMAL);
+        downnode  = decommissionNode(client, fileSys, localFileSys);
+        waitNodeState(fileSys, downnode, NodeState.DECOMMISSIONED);
+        checkFile(fileSys, file1, 3, downnode);
         cleanupFile(fileSys, file1);
+        cleanupFile(localFileSys, dir);
       }
     } catch (IOException e) {
       info = client.datanodeReport();



Mime
View raw message