hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1457883 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: CHANGES.txt src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java src/test/java/org/apache/hadoop/net/TestNetworkTopology.java
Date Mon, 18 Mar 2013 17:26:46 GMT
Author: atm
Date: Mon Mar 18 17:26:45 2013
New Revision: 1457883

URL: http://svn.apache.org/r1457883
Log:
HDFS-4521. Invalid network toploogies should not be cached. Contributed by Colin Patrick McCabe.

Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1457883&r1=1457882&r2=1457883&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Mar 18
17:26:45 2013
@@ -23,6 +23,9 @@ Release 2.0.5-beta - UNRELEASED
     HDFS-4569. Small image transfer related cleanups.
     (Andrew Wang via suresh)
 
+    HDFS-4521. Invalid network toploogies should not be cached. (Colin Patrick
+    McCabe via atm)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1457883&r1=1457882&r2=1457883&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
Mon Mar 18 17:26:45 2013
@@ -66,6 +66,7 @@ import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.CachedDNSToSwitchMapping;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
 import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.HostsFileReader;
@@ -418,8 +419,8 @@ public class DatanodeManager {
       host2DatanodeMap.remove(datanodeMap.put(node.getStorageID(), node));
     }
 
+    networktopology.add(node); // may throw InvalidTopologyException
     host2DatanodeMap.add(node);
-    networktopology.add(node);
     checkIfClusterIsNowMultiRack(node);
 
     if (LOG.isDebugEnabled()) {
@@ -634,92 +635,122 @@ public class DatanodeManager {
       nodeReg.setIpAddr(ip);
       nodeReg.setPeerHostName(hostname);
     }
-
-    nodeReg.setExportedKeys(blockManager.getBlockKeys());
-
-    // Checks if the node is not on the hosts list.  If it is not, then
-    // it will be disallowed from registering. 
-    if (!inHostsList(nodeReg)) {
-      throw new DisallowedDatanodeException(nodeReg);
-    }
-      
-    NameNode.stateChangeLog.info("BLOCK* registerDatanode: from "
-        + nodeReg + " storage " + nodeReg.getStorageID());
-
-    DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
-    DatanodeDescriptor nodeN = host2DatanodeMap.getDatanodeByXferAddr(
-        nodeReg.getIpAddr(), nodeReg.getXferPort());
-      
-    if (nodeN != null && nodeN != nodeS) {
-      NameNode.LOG.info("BLOCK* registerDatanode: " + nodeN);
-      // nodeN previously served a different data storage, 
-      // which is not served by anybody anymore.
-      removeDatanode(nodeN);
-      // physically remove node from datanodeMap
-      wipeDatanode(nodeN);
-      nodeN = null;
-    }
-
-    if (nodeS != null) {
-      if (nodeN == nodeS) {
-        // The same datanode has been just restarted to serve the same data 
-        // storage. We do not need to remove old data blocks, the delta will
-        // be calculated on the next block report from the datanode
-        if(NameNode.stateChangeLog.isDebugEnabled()) {
-          NameNode.stateChangeLog.debug("BLOCK* registerDatanode: "
-              + "node restarted.");
+    
+    try {
+      nodeReg.setExportedKeys(blockManager.getBlockKeys());
+  
+      // Checks if the node is not on the hosts list.  If it is not, then
+      // it will be disallowed from registering. 
+      if (!inHostsList(nodeReg)) {
+        throw new DisallowedDatanodeException(nodeReg);
+      }
+        
+      NameNode.stateChangeLog.info("BLOCK* registerDatanode: from "
+          + nodeReg + " storage " + nodeReg.getStorageID());
+  
+      DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
+      DatanodeDescriptor nodeN = host2DatanodeMap.getDatanodeByXferAddr(
+          nodeReg.getIpAddr(), nodeReg.getXferPort());
+        
+      if (nodeN != null && nodeN != nodeS) {
+        NameNode.LOG.info("BLOCK* registerDatanode: " + nodeN);
+        // nodeN previously served a different data storage, 
+        // which is not served by anybody anymore.
+        removeDatanode(nodeN);
+        // physically remove node from datanodeMap
+        wipeDatanode(nodeN);
+        nodeN = null;
+      }
+  
+      if (nodeS != null) {
+        if (nodeN == nodeS) {
+          // The same datanode has been just restarted to serve the same data 
+          // storage. We do not need to remove old data blocks, the delta will
+          // be calculated on the next block report from the datanode
+          if(NameNode.stateChangeLog.isDebugEnabled()) {
+            NameNode.stateChangeLog.debug("BLOCK* registerDatanode: "
+                + "node restarted.");
+          }
+        } else {
+          // nodeS is found
+          /* The registering datanode is a replacement node for the existing 
+            data storage, which from now on will be served by a new node.
+            If this message repeats, both nodes might have same storageID 
+            by (insanely rare) random chance. User needs to restart one of the
+            nodes with its data cleared (or user can just remove the StorageID
+            value in "VERSION" file under the data directory of the datanode,
+            but this is might not work if VERSION file format has changed 
+         */        
+          NameNode.stateChangeLog.info("BLOCK* registerDatanode: " + nodeS
+              + " is replaced by " + nodeReg + " with the same storageID "
+              + nodeReg.getStorageID());
         }
-      } else {
-        // nodeS is found
-        /* The registering datanode is a replacement node for the existing 
-          data storage, which from now on will be served by a new node.
-          If this message repeats, both nodes might have same storageID 
-          by (insanely rare) random chance. User needs to restart one of the
-          nodes with its data cleared (or user can just remove the StorageID
-          value in "VERSION" file under the data directory of the datanode,
-          but this is might not work if VERSION file format has changed 
-       */        
-        NameNode.stateChangeLog.info("BLOCK* registerDatanode: " + nodeS
-            + " is replaced by " + nodeReg + " with the same storageID "
-            + nodeReg.getStorageID());
-      }
-      // update cluster map
-      getNetworkTopology().remove(nodeS);
-      nodeS.updateRegInfo(nodeReg);
-      nodeS.setDisallowed(false); // Node is in the include list
+        
+        boolean success = false;
+        try {
+          // update cluster map
+          getNetworkTopology().remove(nodeS);
+          nodeS.updateRegInfo(nodeReg);
+          nodeS.setDisallowed(false); // Node is in the include list
+          
+          // resolve network location
+          resolveNetworkLocation(nodeS);
+          getNetworkTopology().add(nodeS);
+            
+          // also treat the registration message as a heartbeat
+          heartbeatManager.register(nodeS);
+          checkDecommissioning(nodeS);
+          success = true;
+        } finally {
+          if (!success) {
+            removeDatanode(nodeS);
+            wipeDatanode(nodeS);
+          }
+        }
+        return;
+      } 
+  
+      // this is a new datanode serving a new data storage
+      if ("".equals(nodeReg.getStorageID())) {
+        // this data storage has never been registered
+        // it is either empty or was created by pre-storageID version of DFS
+        nodeReg.setStorageID(newStorageID());
+        if (NameNode.stateChangeLog.isDebugEnabled()) {
+          NameNode.stateChangeLog.debug(
+              "BLOCK* NameSystem.registerDatanode: "
+              + "new storageID " + nodeReg.getStorageID() + " assigned.");
+        }
+      }
       
-      // resolve network location
-      resolveNetworkLocation(nodeS);
-      getNetworkTopology().add(nodeS);
+      DatanodeDescriptor nodeDescr 
+        = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK);
+      boolean success = false;
+      try {
+        resolveNetworkLocation(nodeDescr);
+        networktopology.add(nodeDescr);
+  
+        // register new datanode
+        addDatanode(nodeDescr);
+        checkDecommissioning(nodeDescr);
         
-      // also treat the registration message as a heartbeat
-      heartbeatManager.register(nodeS);
-      checkDecommissioning(nodeS);
-      return;
-    } 
-
-    // this is a new datanode serving a new data storage
-    if ("".equals(nodeReg.getStorageID())) {
-      // this data storage has never been registered
-      // it is either empty or was created by pre-storageID version of DFS
-      nodeReg.setStorageID(newStorageID());
-      if (NameNode.stateChangeLog.isDebugEnabled()) {
-        NameNode.stateChangeLog.debug(
-            "BLOCK* NameSystem.registerDatanode: "
-            + "new storageID " + nodeReg.getStorageID() + " assigned.");
-      }
-    }
-    // register new datanode
-    DatanodeDescriptor nodeDescr 
-      = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK);
-    resolveNetworkLocation(nodeDescr);
-    addDatanode(nodeDescr);
-    checkDecommissioning(nodeDescr);
-    
-    // also treat the registration message as a heartbeat
-    // no need to update its timestamp
-    // because its is done when the descriptor is created
-    heartbeatManager.addDatanode(nodeDescr);
+        // also treat the registration message as a heartbeat
+        // no need to update its timestamp
+        // because its is done when the descriptor is created
+        heartbeatManager.addDatanode(nodeDescr);
+        success = true;
+      } finally {
+        if (!success) {
+          removeDatanode(nodeDescr);
+          wipeDatanode(nodeDescr);
+        }
+      }
+    } catch (InvalidTopologyException e) {
+      // If the network location is invalid, clear the cached mappings
+      // so that we have a chance to re-add this DataNode with the
+      // correct network location later.
+      dnsToSwitchMapping.reloadCachedMappings();
+      throw e;
+    }
   }
 
   /**

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java?rev=1457883&r1=1457882&r2=1457883&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java
Mon Mar 18 17:26:45 2013
@@ -26,12 +26,23 @@ import static org.junit.Assert.fail;
 import java.util.HashMap;
 import java.util.Map;
 
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.junit.Before;
 import org.junit.Test;
 
 public class TestNetworkTopology {
+  private static final Log LOG = LogFactory.getLog(TestNetworkTopology.class);
   private final static NetworkTopology cluster = new NetworkTopology();
   private DatanodeDescriptor dataNodes[];
   
@@ -213,4 +224,65 @@ public class TestNetworkTopology {
       }
     }
   }
+
+  @Test(timeout=180000)
+  public void testInvalidNetworkTopologiesNotCachedInHdfs() throws Exception {
+    // start a cluster
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = null;
+    try {
+      // bad rack topology
+      String racks[] = { "/a/b", "/c" };
+      String hosts[] = { "foo1.example.com", "foo2.example.com" };
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).
+          racks(racks).hosts(hosts).build();
+      cluster.waitActive();
+      
+      NamenodeProtocols nn = cluster.getNameNodeRpc();
+      Assert.assertNotNull(nn);
+      
+      // Wait for one DataNode to register.
+      // The other DataNode will not be able to register up because of the rack mismatch.
+      DatanodeInfo[] info;
+      while (true) {
+        info = nn.getDatanodeReport(DatanodeReportType.LIVE);
+        Assert.assertFalse(info.length == 2);
+        if (info.length == 1) {
+          break;
+        }
+        Thread.sleep(1000);
+      }
+      // Set the network topology of the other node to the match the network
+      // topology of the node that came up.
+      int validIdx = info[0].getHostName().equals(hosts[0]) ? 0 : 1;
+      int invalidIdx = validIdx == 1 ? 0 : 1;
+      StaticMapping.addNodeToRack(hosts[invalidIdx], racks[validIdx]);
+      LOG.info("datanode " + validIdx + " came up with network location " + 
+        info[0].getNetworkLocation());
+
+      // Restart the DN with the invalid topology and wait for it to register.
+      cluster.restartDataNode(invalidIdx);
+      Thread.sleep(5000);
+      while (true) {
+        info = nn.getDatanodeReport(DatanodeReportType.LIVE);
+        if (info.length == 2) {
+          break;
+        }
+        if (info.length == 0) {
+          LOG.info("got no valid DNs");
+        } else if (info.length == 1) {
+          LOG.info("got one valid DN: " + info[0].getHostName() +
+              " (at " + info[0].getNetworkLocation() + ")");
+        }
+        Thread.sleep(1000);
+      }
+      Assert.assertEquals(info[0].getNetworkLocation(),
+                          info[1].getNetworkLocation());
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
 }



Mime
View raw message