hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiten...@apache.org
Subject svn commit: r1075599 - in /hadoop/hdfs/branches/HDFS-1052: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/java/org/apache/hadoop/hdfs/server/protocol/ src/java/org/apac...
Date Tue, 01 Mar 2011 00:35:29 GMT
Author: jitendra
Date: Tue Mar  1 00:35:28 2011
New Revision: 1075599

URL: http://svn.apache.org/viewvc?rev=1075599&view=rev
Log:
Federation: Datanode command to refresh namenode list at the datanode.

Added:
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeShutdown.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java
Modified:
    hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSUtil.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java

Modified: hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/CHANGES.txt?rev=1075599&r1=1075598&r2=1075599&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-1052/CHANGES.txt Tue Mar  1 00:35:28 2011
@@ -66,6 +66,9 @@ Trunk (unreleased changes)
     HDFS-1651. Tests fail due to null pointer exception in 
     Datnode#shutdown() method. (Tanping via suresh)
 
+    HDFS-1649. Federation: Datanode command to refresh namenode list at 
+    the datanode. (jitendra)
+
   IMPROVEMENTS
 
     HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel)

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1075599&r1=1075598&r2=1075599&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSUtil.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSUtil.java Tue Mar  1
00:35:28 2011
@@ -291,17 +291,16 @@ public class DFSUtil {
    * @return Array of InetSocketAddresses
    * @throws IOException
    */
-  public static InetSocketAddress[] getNNAddresses(Configuration conf)
+  public static List<InetSocketAddress> getNNAddresses(Configuration conf)
       throws IOException {
     List<URI> nns = getNamenodeList(conf);
     if (nns == null) {
       throw new IOException("Federation namnodes are not configured correctly");
     }
 
-    InetSocketAddress[] isas = new InetSocketAddress[nns.size()];
-    int i = 0;
+    List<InetSocketAddress> isas = new ArrayList<InetSocketAddress>();
     for (URI u : nns) {
-      isas[i++] = NameNode.getAddress(u);
+      isas.add(NameNode.getAddress(u));
     }
     return isas;
   }

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=1075599&r1=1075598&r2=1075599&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
(original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
Tue Mar  1 00:35:28 2011
@@ -23,23 +23,35 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
 import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.security.token.TokenInfo;
 
 /** An client-datanode protocol for block recovery
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
+@KerberosInfo(
+    serverPrincipal = DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY)
 @TokenInfo(BlockTokenSelector.class)
 public interface ClientDatanodeProtocol extends VersionedProtocol {
   public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);
 
   /**
-   * 7: Add block pool ID to Block
+   * 8: Add refreshNamenodes method
    */
-  public static final long versionID = 7L;
+  public static final long versionID = 8L;
 
   /** Return the visible length of a replica. */
   long getReplicaVisibleLength(ExtendedBlock b) throws IOException;
+  
+  /**
+   * Refresh the list of federated namenodes from updated configuration
+   * Adds new namenodes and stops the deleted namenodes.
+   * 
+   * @throws IOException on error
+   **/
+  void refreshNamenodes() throws IOException;
 }

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1075599&r1=1075598&r2=1075599&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
(original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
Tue Mar  1 00:35:28 2011
@@ -196,15 +196,128 @@ public class DataNode extends Configured
     return NetUtils.createSocketAddr(target);
   }
   
-  BPOfferService[] nameNodeThreads;
-  private Map<String, BPOfferService> bpMapping = 
-    new HashMap<String, BPOfferService>();
+  /**
+   * Manages he BPOfferService objects for the data node.
+   * Creation, removal, starting, stopping, shutdown on BPOfferService
+   * objects must be done via APIs in this class.
+   */
+  @InterfaceAudience.Private
+  class BlockPoolManager {
+    private final Map<String, BPOfferService> bpMapping;
+    private final Map<InetSocketAddress, BPOfferService> nameNodeThreads;
+    private final DatanodeRegistration dnReg;
+    
+    //This lock is used only to ensure exclusion of refreshNamenodes
+    private final Object refreshNamenodesLock = new Object();
+    
+    BlockPoolManager(Configuration conf, DatanodeRegistration dnReg)
+        throws IOException {
+      this.dnReg = dnReg;
+      bpMapping = new HashMap<String, BPOfferService>();
+      nameNodeThreads = new HashMap<InetSocketAddress, BPOfferService>();
+  
+      List<InetSocketAddress> isas = DFSUtil.getNNAddresses(conf);
+      
+      for(InetSocketAddress isa : isas) {
+        BPOfferService bpos = new BPOfferService(isa, dnReg);
+        nameNodeThreads.put(bpos.getNNSocketAddress(), bpos);
+      }
+    }
+    
+    synchronized void addBlockPool(BPOfferService t) {
+      if (nameNodeThreads.get(t.getNNSocketAddress()) == null) {
+        throw new IllegalArgumentException(
+            "Unknown BPOfferService thread for namenode address:"
+                + t.getNNSocketAddress());
+      }
+      if (t.getBlockPoolId() == null) {
+        throw new IllegalArgumentException("Null blockpool id");
+      }
+      bpMapping.put(t.getBlockPoolId(), t);
+    }
+    
+    /**
+     * Returns the array of BPOfferService objects. 
+     * Caution: The BPOfferService returned could be shutdown any time.
+     */
+    synchronized BPOfferService[] getAllNamenodeThreads() {
+      BPOfferService[] bposArray = new BPOfferService[nameNodeThreads.values()
+          .size()];
+      return nameNodeThreads.values().toArray(bposArray);
+    }
+    
+    synchronized BPOfferService get(String bpid) {
+      return bpMapping.get(bpid);
+    }
+    
+    synchronized void remove(BPOfferService t) {
+      nameNodeThreads.remove(t.getNNSocketAddress());
+      bpMapping.remove(t.getBlockPoolId());
+    }
+    
+    void shutDownAll() throws InterruptedException {
+      BPOfferService[] bposArray = this.getAllNamenodeThreads();
+      for (BPOfferService bpos : bposArray) {
+        bpos.stop();
+      }
+    }
+    
+    synchronized void startAll() {
+      for (BPOfferService bpos: nameNodeThreads.values()) {
+        bpos.start();
+      }
+    }
+    
+    void joinAll() throws InterruptedException {
+      for (BPOfferService bpos: this.getAllNamenodeThreads()) {
+        bpos.join();
+      }
+    }
+    
+    void refreshNamenodes(Configuration conf)
+        throws IOException, InterruptedException {
+      List<InetSocketAddress> newAddresses = DFSUtil.getNNAddresses(conf);
+      List<BPOfferService> toShutdown = new ArrayList<BPOfferService>();
+      List<InetSocketAddress> toStart = new ArrayList<InetSocketAddress>();
+      synchronized (refreshNamenodesLock) {
+        synchronized (this) {
+          for (InetSocketAddress nnaddr : nameNodeThreads.keySet()) {
+            if (!(newAddresses.contains(nnaddr))) {
+              toShutdown.add(nameNodeThreads.get(nnaddr));
+            }
+          }
+          for (InetSocketAddress nnaddr : newAddresses) {
+            if (!(nameNodeThreads.containsKey(nnaddr))) {
+              toStart.add(nnaddr);
+            }
+          }
+
+          for (InetSocketAddress nnaddr : toStart) {
+            BPOfferService bpos = new BPOfferService(nnaddr, dnReg);
+            nameNodeThreads.put(bpos.getNNSocketAddress(), bpos);
+          }
+
+          for (BPOfferService bpos : toShutdown) {
+            remove(bpos);
+          }
+        }
+
+        for (BPOfferService bpos : toShutdown) {
+          bpos.stop();
+        }
+        // Now start the threads that are not already running.
+        startAll();
+      }
+    }
+  }
+  
+  volatile boolean shouldRun = true;
+  private BlockPoolManager blockPoolManager;
   public DatanodeProtocol namenodeTODO_FED = null; //TODO:FEDERATION needs to be taken out.
   public FSDatasetInterface data = null;
   public DatanodeRegistration dnRegistration = null;
   private String clusterId = null;
 
-  volatile boolean shouldRun = true;
   public final static String EMPTY_DEL_HINT = "";
   AtomicInteger xmitsInProgress = new AtomicInteger();
   Daemon dataXceiverServer = null;
@@ -429,7 +542,7 @@ public class DataNode extends Configured
   
   // calls specific to BP
   protected void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
-    BPOfferService bpos = bpMapping.get(block.getPoolId());
+    BPOfferService bpos = blockPoolManager.get(block.getPoolId());
     if(bpos != null) {
       bpos.notifyNamenodeReceivedBlock(block, delHint); 
     } else {
@@ -439,7 +552,7 @@ public class DataNode extends Configured
   }
   
   public void reportBadBlocks(ExtendedBlock block) throws IOException{
-    BPOfferService bpos = bpMapping.get(block.getPoolId());
+    BPOfferService bpos = blockPoolManager.get(block.getPoolId());
     if(bpos == null || bpos.bpNamenode == null) {
       throw new IOException("cannot locate OfferService thread for bp="+block.getPoolId());
     }
@@ -456,7 +569,7 @@ public class DataNode extends Configured
    * </ul>
    */
   class BPOfferService implements Runnable {
-    final InetSocketAddress nn_addr;
+    final InetSocketAddress nnAddr;
     DatanodeRegistration bpRegistration;
     NamespaceInfo bpNSInfo;
     long lastBlockReport = 0;
@@ -467,10 +580,11 @@ public class DataNode extends Configured
     private boolean initialized = false;
     private final LinkedList<Block> receivedBlockList = new LinkedList<Block>();
     private final LinkedList<String> delHints = new LinkedList<String>();
+    volatile private boolean shouldServiceRun = true;
 
     BPOfferService(InetSocketAddress isa, DatanodeRegistration bpRegistration) {
-      this.bpRegistration = bpRegistration;
-      this.nn_addr = isa;
+      this.bpRegistration = new DatanodeRegistration(bpRegistration);
+      this.nnAddr = isa;
     }
 
     /**
@@ -484,11 +598,15 @@ public class DataNode extends Configured
     public String getBlockPoolId() {
       return blockPoolId;
     }
+    
+    private InetSocketAddress getNNSocketAddress() {
+      return nnAddr;
+    }
  
     void setNamespaceInfo(NamespaceInfo nsinfo) {
       bpNSInfo = nsinfo;
       this.blockPoolId = nsinfo.getBlockPoolID();
-      bpMapping.put(blockPoolId, this);
+      blockPoolManager.addBlockPool(this);
     }
 
     void setNameNode(DatanodeProtocol dnProtocol) {
@@ -497,12 +615,12 @@ public class DataNode extends Configured
 
     private NamespaceInfo handshake() throws IOException {
       NamespaceInfo nsInfo = new NamespaceInfo();
-      while (shouldRun) {
+      while (shouldRun && shouldServiceRun) {
         try {
           nsInfo = bpNamenode.versionRequest();
           break;
         } catch(SocketTimeoutException e) {  // namenode is busy
-          LOG.info("Problem connecting to server: " + nn_addr);
+          LOG.info("Problem connecting to server: " + nnAddr);
           try {
             Thread.sleep(1000);
           } catch (InterruptedException ie) {}
@@ -521,7 +639,7 @@ public class DataNode extends Configured
           bpNamenode.errorReport( bpRegistration,
               DatanodeProtocol.NOTIFY, errorMsg );
         } catch( SocketTimeoutException e ) {  // namenode is busy
-          LOG.info("Problem connecting to server: " + nn_addr);
+          LOG.info("Problem connecting to server: " + nnAddr);
         }
         throw new IOException( errorMsg );
       }
@@ -538,8 +656,8 @@ public class DataNode extends Configured
       // get NN proxy
       DatanodeProtocol dnp = 
         (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class,
-            DatanodeProtocol.versionID, nn_addr, conf);
-      LOG.info("NN proxy created in BP="+blockPoolId + " for " + nn_addr);
+            DatanodeProtocol.versionID, nnAddr, conf);
+      LOG.info("NN proxy created in BP="+blockPoolId + " for " + nnAddr);
       setNameNode(dnp);
 
       // handshake with NN
@@ -560,7 +678,7 @@ public class DataNode extends Configured
         bpRegistration.storageInfo.namespaceID = bpNSInfo.namespaceID;
         bpRegistration.storageInfo.clusterID = bpNSInfo.clusterID;
         // TODO: FEDERATION 
-        // bpRegistration.storageInfo.blockpoolID = bpNSInfo.blockpoolID;
+        //bpRegistration.storageInfo.blockpoolID = bpNSInfo.blockpoolID;
       } else {
         // read storage info, lock data dirs and transition fs state if necessary       
  
         storage.recoverTransitionRead(blockPoolId, bpNSInfo, dataDirs, startOpt);
@@ -726,14 +844,51 @@ public class DataNode extends Configured
           xmitsInProgress.get(),
           getXceiverCount());
     }
-
+    
+    //This must be called only by blockPoolManager
+    void start() {
+      if ((bpThread != null) && (bpThread.isAlive())) {
+        //Thread is started already
+        return;
+      }
+      bpThread = new Thread(this, dnThreadName);
+      bpThread.setDaemon(true); // needed for JUnit testing
+      bpThread.start();
+    }
+    
+    //This must be called only by blockPoolManager.
+    void stop() {
+      shouldServiceRun = false;
+      if (bpThread != null) {
+        try {
+          bpThread.interrupt();
+          bpThread.join();
+        } catch (InterruptedException ex) {
+          LOG.warn("Received exception: ", ex);
+        }
+      }
+    }
+    
+    //This must be called only by blockPoolManager
+    void join() throws InterruptedException {
+      if (bpThread != null) {
+        bpThread.join();
+      }
+    }
+    
+    //Cleanup method to be called by current thread before exiting.
+    private void cleanUp() {
+      blockPoolManager.remove(this);
+      shouldServiceRun = false;
+      RPC.stopProxy(bpNamenode);
+    }
 
     /**
      * Main loop for each BP thread. Run until shutdown,
      * forever calling remote NameNode functions.
      */
     private void offerService() throws Exception {
-      LOG.info("For namenode " + nn_addr + " using BLOCKREPORT_INTERVAL of "
+      LOG.info("For namenode " + nnAddr + " using BLOCKREPORT_INTERVAL of "
           + blockReportInterval + "msec" + " Initial delay: "
           + initialBlockReportDelay + "msec" + "; heartBeatInterval="
           + heartBeatInterval);
@@ -741,7 +896,7 @@ public class DataNode extends Configured
       //
       // Now loop for a long time....
       //
-      while (shouldRun) {
+      while (shouldRun && shouldServiceRun) {
         try {
           long startTime = now();
 
@@ -791,6 +946,8 @@ public class DataNode extends Configured
               try {
                 receivedBlockList.wait(waitTime);
               } catch (InterruptedException ie) {
+                LOG.warn("BPOfferService for block pool="
+                    + this.getBlockPoolId() + " received exception:" + ie);
               }
             }
           } // synchronized
@@ -801,7 +958,6 @@ public class DataNode extends Configured
               IncorrectVersionException.class.getName().equals(reClass)) {
             LOG.warn("DataNode is shutting down: " + 
                 StringUtils.stringifyException(re));
-            shutdown();  // TODO:FEDERATION - ??? what to do here
             return;
           }
           LOG.warn(StringUtils.stringifyException(re));
@@ -813,8 +969,10 @@ public class DataNode extends Configured
           }
         } catch (IOException e) {
           LOG.warn(StringUtils.stringifyException(e));
+        } finally {
+          shouldServiceRun = false;
         }
-      } // while (shouldRun)
+      } // while (shouldRun && shouldServiceRun)
     } // offerService
 
 
@@ -835,7 +993,7 @@ public class DataNode extends Configured
       LOG.info("in register: sid=" + bpRegistration.getStorageID() + ";SI="
           + bpRegistration.storageInfo); 
                 
-      while(shouldRun) {
+      while(shouldRun && shouldServiceRun) {
         try {
           // reset name to machineName. Mainly for web interface. Same for all DB
           bpRegistration.name = machineName + ":" + bpRegistration.getPort();
@@ -854,7 +1012,7 @@ public class DataNode extends Configured
 
           break;
         } catch(SocketTimeoutException e) {  // namenode is busy
-          LOG.info("Problem connecting to server: " + nn_addr);
+          LOG.info("Problem connecting to server: " + nnAddr);
           try {
             Thread.sleep(1000);
           } catch (InterruptedException ie) {}
@@ -913,44 +1071,46 @@ public class DataNode extends Configured
      * That's the loop that connects to the NameNode and provides basic DataNode
      * functionality.
      *
-     * Only stop when "shouldRun" is turned off (which can only happen at shutdown).
+     * Only stop when "shouldRun" or "shouldServiceRun" is turned off, which can
+     * happen either at shutdown or due to refreshNamenodes.
      */
     public void run() {
-      LOG.info(bpRegistration + "In BPOfferService.run, data = " + data + 
-          ";bp="+blockPoolId);
+      LOG.info(bpRegistration + "In BPOfferService.run, data = " + data
+          + ";bp=" + blockPoolId);
 
-      //init stuff
       try {
-        // setup storage
-        setupBP(conf, dataDirs);
-        register();
-      } catch (IOException ioe) {
-        LOG.error(bpRegistration + ": Setup failed", ioe);
-        // TODO:FEDERATION should be local only
-        //shutdown();
-        return;
-      }
+        // init stuff
+        try {
+          // setup storage
+          setupBP(conf, dataDirs);
+          register();
+        } catch (IOException ioe) {
+          LOG.error(bpRegistration + ": Setup failed", ioe);
+          return;
+        }
 
-      initialized = true; // bp is initialized;
+        initialized = true; // bp is initialized;
 
-      while (shouldRun) {
-        try {
-          // TODO:FEDERATION needs to be moved too
-          startDistributedUpgradeIfNeeded();
-          offerService();
-        } catch (Exception ex) {
-          LOG.error("Exception: " + StringUtils.stringifyException(ex));
-          if (shouldRun) {
-            try {
-              Thread.sleep(5000);
-            } catch (InterruptedException ie) {
+        while (shouldRun && shouldServiceRun) {
+          try {
+            // TODO:FEDERATION needs to be moved too
+            startDistributedUpgradeIfNeeded();
+            offerService();
+          } catch (Exception ex) {
+            LOG.error("Exception: " + StringUtils.stringifyException(ex));
+            if (shouldRun && shouldServiceRun) {
+              try {
+                Thread.sleep(5000);
+              } catch (InterruptedException ie) {
+                LOG.warn("Received exception: ", ie);
+              }
             }
           }
         }
+      } finally {
+        LOG.info(dnRegistration + ":Finishing DataNode in: " + data);
+        cleanUp();
       }
-
-      LOG.info(dnRegistration + ":Finishing DataNode in: "+data);
-      shutdown();
     }
 
     /**
@@ -1012,12 +1172,12 @@ public class DataNode extends Configured
         break;
       case DatanodeProtocol.DNA_SHUTDOWN:
         // shut down the data node
-        shutdown();  //TODO:FEDERATION  - we should not shutdown the whole datanode.
+        shouldServiceRun = false;
         return false;
       case DatanodeProtocol.DNA_REGISTER:
         // namenode requested a registration - at start or if NN lost contact
         LOG.info("DatanodeCommand action: DNA_REGISTER");
-        if (shouldRun) {
+        if (shouldRun && shouldServiceRun) {
           register();
         }
         break;
@@ -1083,8 +1243,11 @@ public class DataNode extends Configured
 
     myMetrics = new DataNodeMetrics(conf, dnRegistration.getName());
 
-    // get all the NNs configured
-    nameNodeThreads = getAllNamenodes(conf);
+    blockPoolManager = new BlockPoolManager(conf, dnRegistration);
+  }
+  
+  BPOfferService[] getAllBpOs() {
+    return blockPoolManager.getAllNamenodeThreads();
   }
   
   /**
@@ -1138,29 +1301,6 @@ public class DataNode extends Configured
     // used until it is initialized in register().
     this.blockTokenSecretManager = new BlockTokenSecretManager(false, 0, 0);    
   }
-
-  /**
-   * for each namenode create an offerservice object 
-   * Threads will be started later (out of DataNode constructor)
-   * @param conf
-   * @throws IOException
-   */
-  private BPOfferService[] getAllNamenodes(Configuration conf)
-      throws IOException {
-    if(nameNodeThreads != null)
-      return nameNodeThreads; // already initialized
-    
-    // get NNs addresses from the configuration
-    InetSocketAddress[] isas = DFSUtil.getNNAddresses(conf);
-
-    AbstractList<BPOfferService> al = new ArrayList<BPOfferService> (isas.length);
-    for(InetSocketAddress isa : isas) {
-      BPOfferService bpos = new BPOfferService(isa, dnRegistration);
-      al.add(bpos);
-    }
-    nameNodeThreads = new BPOfferService[isas.length];
-    return al.toArray(nameNodeThreads);
-  }
   
   /**
    * Determine the http server's effective addr
@@ -1183,7 +1323,7 @@ public class DataNode extends Configured
   
   public DatanodeRegistration getDNRegistrationForBP(String bpid) 
   throws IOException {
-    BPOfferService bpos = bpMapping.get(bpid);
+    BPOfferService bpos = blockPoolManager.get(bpid);
     if(bpos==null || bpos.bpRegistration==null) {
       throw new IOException("cannot find BPOfferService for bpid="+bpid);
     }
@@ -1312,6 +1452,7 @@ public class DataNode extends Configured
     if (ipcServer != null) {
       ipcServer.stop();
     }
+    
     this.shouldRun = false;
     if (dataXceiverServer != null) {
       ((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
@@ -1343,21 +1484,10 @@ public class DataNode extends Configured
       }
     }
     
-    // interrupt all the threads, let them discover that shouldRun is false now
-    for(BPOfferService bpos : nameNodeThreads) {
-      if(bpos != null && bpos.bpThread!=null) {
-        bpos.bpThread.interrupt();
-        RPC.stopProxy(bpos.bpNamenode); // stop the RPC threads 
-      }
-    } 
-
-    // wait until the bp threads are done.
-    for(BPOfferService bpos : nameNodeThreads) {
-      if(bpos != null && bpos.bpThread!=null) {
-        try {
-          bpos.bpThread.join();
-        } catch (InterruptedException ignored) {}
-      }
+    try {
+      this.blockPoolManager.shutDownAll();
+    } catch (InterruptedException ie) {
+      LOG.warn("Received exception in BlockPoolManager#shutDownAll: ", ie);
     }
 
     if(upgradeManager != null)
@@ -1424,7 +1554,7 @@ public class DataNode extends Configured
       dp_error = DatanodeProtocol.FATAL_DISK_ERROR;
     }
     //inform NameNodes
-    for(BPOfferService bpos: nameNodeThreads) {
+    for(BPOfferService bpos: blockPoolManager.getAllNamenodeThreads()) {
       DatanodeProtocol nn = bpos.bpNamenode;
       try {
         nn.errorReport(bpos.bpRegistration, dp_error, errMsgr);
@@ -1437,7 +1567,7 @@ public class DataNode extends Configured
     }
     
     LOG.warn("DataNode is shutting down.\n" + errMsgr);
-    shouldRun = false; 
+    shutdown();
   }
     
   /** Number of concurrent xceivers per node. */
@@ -1690,7 +1820,7 @@ public class DataNode extends Configured
    */
   void closeBlock(ExtendedBlock block, String delHint) {
     myMetrics.blocksWritten.inc();
-    BPOfferService bpos = bpMapping.get(block.getPoolId());
+    BPOfferService bpos = blockPoolManager.get(block.getPoolId());
     if(bpos != null) {
       bpos.notifyNamenodeReceivedBlock(block, delHint);
     } else {
@@ -1706,14 +1836,7 @@ public class DataNode extends Configured
    *  If this thread is specifically interrupted, it will stop waiting.
    */
   public void runDatanodeDaemon() throws IOException {
-    if (nameNodeThreads != null) {
-      // Start namenode threads
-      for(BPOfferService bp : nameNodeThreads) {
-        bp.bpThread = new Thread(bp, dnThreadName);
-        bp.bpThread.setDaemon(true); // needed for JUnit testing
-        bp.bpThread.start();
-      }
-    }
+    blockPoolManager.startAll();
 
     // start dataXceiveServer
     dataXceiverServer.start();
@@ -1796,12 +1919,12 @@ public class DataNode extends Configured
   }
 
   void join() {
-    // TODO:FEDERATION do not ignore InterruptedException
-    for(BPOfferService bpos : nameNodeThreads) {
-      if(bpos.bpThread != null) 
-        try {
-          bpos.bpThread.join();
-        } catch (InterruptedException e) {}
+    while (shouldRun) {
+      try {
+        blockPoolManager.joinAll();
+      } catch (InterruptedException ex) {
+        LOG.warn("Received exception in Datanode#join: " + ex);
+      }
     }
   }
 
@@ -1913,7 +2036,7 @@ public class DataNode extends Configured
    * the block report at the next heartbeat.
    */
   public void scheduleAllBlockReport(long delay) {
-    for(BPOfferService bpos : nameNodeThreads) {
+    for(BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
       bpos.scheduleBlockReport(delay);
     }
   }
@@ -2079,7 +2202,7 @@ public class DataNode extends Configured
    * @throws IOException
    */
   public DatanodeProtocol getBPNamenode(String bpid) throws IOException {
-    BPOfferService bpos = bpMapping.get(bpid);
+    BPOfferService bpos = blockPoolManager.get(bpid);
     if(bpos == null || bpos.bpNamenode == null) {
       throw new IOException("cannot find a namnode proxy for bpid=" + bpid);
     }
@@ -2279,5 +2402,20 @@ public class DataNode extends Configured
   public String getClusterId() {
     return clusterId;
   }
+  
+  void refreshNamenodes(Configuration conf) throws IOException {
+    try {
+      blockPoolManager.refreshNamenodes(conf);
+    } catch (InterruptedException ex) {
+      IOException eio = new IOException();
+      eio.initCause(ex);
+      throw eio;
+    }
+  }
 
+  @Override //ClientDatanodeProtocol
+  public void refreshNamenodes() throws IOException {
+    conf = new Configuration();
+    refreshNamenodes(conf);
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java?rev=1075599&r1=1075598&r2=1075599&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
(original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
Tue Mar  1 00:35:28 2011
@@ -61,6 +61,15 @@ implements Writable, NodeRegistration {
   }
   
   /**
+   * Copy constructor
+   */
+  public DatanodeRegistration(DatanodeRegistration from) {
+    super(from);
+    this.storageInfo = new StorageInfo();
+    this.exportedKeys = new ExportedBlockKeys();
+  }
+  
+  /**
    * Create DatanodeRegistration
    */
   public DatanodeRegistration(String nodeName) {

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java?rev=1075599&r1=1075598&r2=1075599&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java Tue
Mar  1 00:35:28 2011
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.tools;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -36,6 +37,7 @@ import org.apache.hadoop.fs.shell.Comman
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
@@ -479,6 +481,7 @@ public class DFSAdmin extends FsShell {
       "\t[-refreshUserToGroupsMappings]\n" +
       "\t[refreshSuperUserGroupsConfiguration]\n" +
       "\t[-printTopology]\n" +
+      "\t[-refreshNamenodes datanodehost:port]\n"+
       "\t[-help [cmd]]\n";
 
     String report ="-report: \tReports basic filesystem information and statistics.\n";
@@ -541,6 +544,11 @@ public class DFSAdmin extends FsShell {
     String printTopology = "-printTopology: Print a tree of the racks and their\n" +
                            "\t\tnodes as reported by the Namenode\n";
     
+    String refreshNamenodes = "-refreshNamenodes: Takes a datanodehost:port as argument,\n"+
+                              "\t\tFor the given datanode, reloads the configuration files,\n"
+
+                              "\t\tstops serving the removed block-pools\n"+
+                              "\t\tand starts serving new block-pools\n";
+    
     String help = "-help [cmd]: \tDisplays help for the given command or all commands if
none\n" +
       "\t\tis specified.\n";
 
@@ -576,6 +584,8 @@ public class DFSAdmin extends FsShell {
       System.out.println(refreshSuperUserGroupsConfiguration);
     } else if ("printTopology".equals(cmd)) {
       System.out.println(printTopology);
+    } else if ("refreshNamenodes".equals(cmd)) {
+      System.out.println(refreshNamenodes);
     } else if ("help".equals(cmd)) {
       System.out.println(help);
     } else {
@@ -596,6 +606,7 @@ public class DFSAdmin extends FsShell {
       System.out.println(refreshUserToGroupsMappings);
       System.out.println(refreshSuperUserGroupsConfiguration);
       System.out.println(printTopology);
+      System.out.println(refreshNamenodes);
       System.out.println(help);
       System.out.println();
       ToolRunner.printGenericCommandUsage(System.out);
@@ -875,6 +886,9 @@ public class DFSAdmin extends FsShell {
     } else if ("-printTopology".equals(cmd)) {
       System.err.println("Usage: java DFSAdmin"
                          + " [-printTopology]");
+    } else if ("-refreshNamenodes".equals(cmd)) {
+      System.err.println("Usage: java DFSAdmin"
+                         + " [-refreshNamenodes datanode-host:port]");
     } else {
       System.err.println("Usage: java DFSAdmin");
       System.err.println("           [-report]");
@@ -889,6 +903,7 @@ public class DFSAdmin extends FsShell {
       System.err.println("           [-refreshUserToGroupsMappings]");
       System.err.println("           [-refreshSuperUserGroupsConfiguration]");
       System.err.println("           [-printTopology]");
+      System.err.println("           [-refreshNamenodes datanodehost:port]");
       System.err.println("           ["+SetQuotaCommand.USAGE+"]");
       System.err.println("           ["+ClearQuotaCommand.USAGE+"]");
       System.err.println("           ["+SetSpaceQuotaCommand.USAGE+"]");
@@ -974,6 +989,11 @@ public class DFSAdmin extends FsShell {
         printUsage(cmd);
         return exitCode;
       }
+    } else if ("-refreshNamenodes".equals(cmd)) {
+      if (argv.length != 2) {
+        printUsage(cmd);
+        return exitCode;
+      }
     }
     
     // initialize DFSAdmin
@@ -1022,6 +1042,8 @@ public class DFSAdmin extends FsShell {
         exitCode = refreshSuperUserGroupsConfiguration();
       } else if ("-printTopology".equals(cmd)) {
         exitCode = printTopology();
+      } else if ("-refreshNamenodes".equals(cmd)) {
+        exitCode = refreshNamenodes(argv, i);
       } else if ("-help".equals(cmd)) {
         if (i < argv.length) {
           printHelp(argv[i]);
@@ -1059,6 +1081,38 @@ public class DFSAdmin extends FsShell {
     return exitCode;
   }
 
+  private int refreshNamenodes(String[] argv, int i) throws IOException {
+    String datanode = argv[i];
+    
+    int colonIndex = datanode.indexOf(':');
+    String datanodeHostname = datanode.substring(0, colonIndex);
+    String portString = datanode.substring(colonIndex+1);
+    int port = Integer.valueOf(portString).intValue();
+    
+    InetSocketAddress datanodeAddr = new InetSocketAddress(datanodeHostname,
+        port);
+    
+    // Get the current configuration
+    Configuration conf = getConf();
+    
+    // for security authorization
+    // server principal for this call   
+    // should be NN's one.
+    conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY, 
+        conf.get(DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY, ""));
+
+    // Create the client
+    ClientDatanodeProtocol refreshProtocol = (ClientDatanodeProtocol) RPC
+        .getProxy(ClientDatanodeProtocol.class,
+            ClientDatanodeProtocol.versionID, datanodeAddr, getUGI(), conf,
+            NetUtils.getSocketFactory(conf, ClientDatanodeProtocol.class));
+    
+    // Refresh the authorization policy in-effect
+    refreshProtocol.refreshNamenodes();
+    
+    return 0;
+  }
+
   /**
    * main() has some simple utility methods.
    * @param argv Command line parameters.

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java?rev=1075599&r1=1075598&r2=1075599&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java
(original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java
Tue Mar  1 00:35:28 2011
@@ -79,7 +79,6 @@ public class TestDataNodeMultipleRegistr
         false);
 
     // Setup the NameNode configuration
-    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, localHost + ":0");
     if (manageNameDfsDirs) {
       String name = fileAsURI(new File(base_dir, "name1")) + ","
           + fileAsURI(new File(base_dir, "name2"));
@@ -148,11 +147,13 @@ public class TestDataNodeMultipleRegistr
     int nnPort = 9928;
     String nnURL1 = "hdfs://" + localHost + ":" + Integer.toString(nnPort);
     FileSystem.setDefaultUri(conf, nnURL1);
+    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, localHost + ":50070");
     nn1 = startNameNode(conf, nnPort);
     
     nnPort = 9929;
     String nnURL2 = "hdfs://" + localHost + ":" + Integer.toString(nnPort);
     FileSystem.setDefaultUri(conf, nnURL2);
+    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, localHost + ":50071");
     nn2 = startNameNode(conf, nnPort);
     
     Assert.assertNotNull("cannot create nn1", nn1);
@@ -191,17 +192,24 @@ public class TestDataNodeMultipleRegistr
     Assert.assertEquals("number of volumes is wrong",2, volInfos.size());
     
     
-    for (BPOfferService bpos : dn.nameNodeThreads) {
+    for (BPOfferService bpos : dn.getAllBpOs()) {
       LOG.info("reg: bpid=" + "; name=" + bpos.bpRegistration.name
-          + "; sid=" + bpos.bpRegistration.storageID + "; nna=" + bpos.nn_addr);
+          + "; sid=" + bpos.bpRegistration.storageID + "; nna=" + bpos.nnAddr);
     }
     
-    BPOfferService bpos1 = dn.nameNodeThreads[0];
-    BPOfferService bpos2 = dn.nameNodeThreads[1];
+    BPOfferService bpos1 = dn.getAllBpOs()[0];
+    BPOfferService bpos2 = dn.getAllBpOs()[1];
+    
+    //The order of bpos is not guaranteed, so fix the order
+    if (bpos1.nnAddr.equals(nn2.getNameNodeAddress())) {
+      BPOfferService tmp = bpos1;
+      bpos1 = bpos2;
+      bpos2 = tmp;
+    }
 
-    Assert.assertEquals("wrong nn address", bpos1.nn_addr, nn1
+    Assert.assertEquals("wrong nn address", bpos1.nnAddr, nn1
         .getNameNodeAddress());
-    Assert.assertEquals("wrong nn address", bpos2.nn_addr, nn2
+    Assert.assertEquals("wrong nn address", bpos2.nnAddr, nn2
         .getNameNodeAddress());
     Assert.assertEquals("wrong bpid", bpos1.getBlockPoolId(), bpid1);
     Assert.assertEquals("wrong bpid", bpos2.getBlockPoolId(), bpid2);
@@ -232,6 +240,7 @@ public class TestDataNodeMultipleRegistr
     String nnURL = "hdfs://" + localHost + ":" + Integer.toString(nnPort);
 
     FileSystem.setDefaultUri(conf, nnURL);
+    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, localHost + ":50070");
     nn1 = startNameNode(conf, nnPort);
     Assert.assertNotNull("cannot create nn1", nn1);
 
@@ -260,19 +269,19 @@ public class TestDataNodeMultipleRegistr
     Assert.assertEquals("number of volumes is wrong",2, volInfos.size());
     
 
-    for (BPOfferService bpos : dn.nameNodeThreads) {
-      LOG.debug("reg: bpid=" + "; name=" + bpos.bpRegistration.name
-          + "; sid=" + bpos.bpRegistration.storageID + "; nna=" + bpos.nn_addr);
+    for (BPOfferService bpos : dn.getAllBpOs()) {
+      LOG.debug("reg: bpid=" + "; name=" + bpos.bpRegistration.name + "; sid="
+          + bpos.bpRegistration.storageID + "; nna=" + bpos.nnAddr);
     }
-    
+
     // try block report
-    BPOfferService bpos1 = dn.nameNodeThreads[0];
+    BPOfferService bpos1 = dn.getAllBpOs()[0];
     bpos1.lastBlockReport = 0;
     DatanodeCommand cmd = bpos1.blockReport();
 
     Assert.assertNotNull("cmd is null", cmd);
 
-    Assert.assertEquals("wrong nn address", bpos1.nn_addr, nn1
+    Assert.assertEquals("wrong nn address", bpos1.nnAddr, nn1
         .getNameNodeAddress());
     Assert.assertEquals("wrong bpid", bpos1.getBlockPoolId(), bpid1);
     Assert.assertEquals("wrong cid", dn.getClusterId(), cid1);
@@ -283,7 +292,7 @@ public class TestDataNodeMultipleRegistr
     nn1 = null;
   }
 
-  private void shutdownNN(NameNode nn) {
+  void shutdownNN(NameNode nn) {
     if (nn == null) {
       return;
     }
@@ -292,8 +301,8 @@ public class TestDataNodeMultipleRegistr
   }
 
   public boolean isDnUp(DataNode dn) {
-    boolean up = dn.nameNodeThreads.length > 0;
-    for (BPOfferService bpos : dn.nameNodeThreads) {
+    boolean up = dn.getAllBpOs().length > 0;
+    for (BPOfferService bpos : dn.getAllBpOs()) {
       up = up && bpos.initialized();
     }
     return up;

Added: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeShutdown.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeShutdown.java?rev=1075599&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeShutdown.java
(added)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeShutdown.java
Tue Mar  1 00:35:28 2011
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDataNodeShutdown {
+  
+  private String localhost = "127.0.0.1";
+  private int nnPort1 = 2231;
+  private int nnPort2 = 2232;
+  private final String nnURL1 = "hdfs://" + localhost + ":" + nnPort1;
+  private final String nnURL2 = "hdfs://" + localhost + ":" + nnPort2;
+  private NameNode nn1 = null;
+  private NameNode nn2 = null;
+  private TestDataNodeMultipleRegistrations tdnmr = null;
+  
+  @Before
+  public void setUp() throws Exception {
+    tdnmr = new TestDataNodeMultipleRegistrations();
+    tdnmr.setUp();
+  }
+  
+  private void startNamenodes() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "127.0.0.1:0");
+
+    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:50071");
+    FileSystem.setDefaultUri(conf, nnURL1);
+    nn1 = tdnmr.startNameNode(conf, nnPort1);
+
+    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:50072");
+    FileSystem.setDefaultUri(conf, nnURL2);
+    nn2 = tdnmr.startNameNode(conf, nnPort2);
+  }
+
+  @Test
+  public void testDataNodeShutdown() throws IOException {
+    Configuration conf = new Configuration();
+    
+    conf.set(DFSConfigKeys.DFS_FEDERATION_NAMENODES, nnURL1 +","+ nnURL2);
+    startNamenodes();
+    
+    DataNode dn = tdnmr.startDataNode(conf);
+    tdnmr.waitDataNodeUp(dn);
+
+    //shutdown datanode
+    dn.shutdown();
+    
+    Assert.assertEquals(0, dn.getAllBpOs().length);
+    
+    //Restart datanode
+    dn = tdnmr.startDataNode(conf);
+    tdnmr.waitDataNodeUp(dn);
+    Assert.assertEquals(2, dn.getAllBpOs().length);
+    
+    dn.shutdown();
+  }
+  
+  @After
+  public void tearDown() throws Exception {
+    tdnmr.shutdownNN(nn1);
+    tdnmr.shutdownNN(nn2);
+  }
+
+}

Added: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java?rev=1075599&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java
(added)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java
Tue Mar  1 00:35:28 2011
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.datanode.DataNode.BPOfferService;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestRefreshNamenodes {
+  
+  private String localhost = "127.0.0.1";
+  private int nnPort1 = 2221;
+  private int nnPort2 = 2222;
+  private int nnPort3 = 2223;
+  private int nnPort4 = 2224;
+  private final String nnURL1 = "hdfs://" + localhost + ":" + nnPort1;
+  private final String nnURL2 = "hdfs://" + localhost + ":" + nnPort2;
+  private final String nnURL3 = "hdfs://" + localhost + ":" + nnPort3;
+  private final String nnURL4 = "hdfs://" + localhost + ":" + nnPort4;
+  private NameNode nn1 = null;
+  private NameNode nn2 = null;
+  private NameNode nn3 = null;
+  private NameNode nn4 = null;
+  private TestDataNodeMultipleRegistrations tdnmr = null;
+  
+  @Before
+  public void setUp() throws Exception {
+    tdnmr = new TestDataNodeMultipleRegistrations();
+    tdnmr.setUp();
+  }
+  
+  private void startNamenodes() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "127.0.0.1:0");
+
+    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:50071");
+    FileSystem.setDefaultUri(conf, nnURL1);
+    nn1 = tdnmr.startNameNode(conf, nnPort1);
+
+    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:50072");
+    FileSystem.setDefaultUri(conf, nnURL2);
+    nn2 = tdnmr.startNameNode(conf, nnPort2);
+   
+    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:50073");
+    FileSystem.setDefaultUri(conf, nnURL3);
+    nn3 = tdnmr.startNameNode(conf, nnPort3);
+    
+    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:50074");
+    FileSystem.setDefaultUri(conf, nnURL4);
+    nn4 = tdnmr.startNameNode(conf, nnPort4);
+  }
+
+  @Test
+  public void testRefreshNamenodes() throws IOException {
+    Configuration conf = new Configuration();
+    
+    conf.set(DFSConfigKeys.DFS_FEDERATION_NAMENODES, nnURL1 +","+ nnURL2);   
+    startNamenodes();
+    
+    DataNode dn = tdnmr.startDataNode(conf);
+    tdnmr.waitDataNodeUp(dn);
+
+    assertEquals(2, dn.getAllBpOs().length);
+    conf.set(DFSConfigKeys.DFS_FEDERATION_NAMENODES, nnURL1 + "," + nnURL3
+        + "," + nnURL4);
+    dn.refreshNamenodes(conf);
+    tdnmr.waitDataNodeUp(dn);
+    BPOfferService[] bpoList = dn.getAllBpOs();
+    assertEquals(3, bpoList.length);
+
+    InetSocketAddress nn_addr_1 = bpoList[0].nnAddr;
+    InetSocketAddress nn_addr_2 = bpoList[1].nnAddr;
+    InetSocketAddress nn_addr_3 = bpoList[2].nnAddr;
+    
+    assertTrue(nn_addr_1.equals(nn1.getNameNodeAddress()));
+    assertTrue(nn_addr_2.equals(nn3.getNameNodeAddress()));
+    assertTrue(nn_addr_3.equals(nn4.getNameNodeAddress()));
+
+    dn.shutdown();
+    tdnmr.shutdownNN(nn1);
+    tdnmr.shutdownNN(nn2);
+    tdnmr.shutdownNN(nn3);
+    tdnmr.shutdownNN(nn4);
+  }
+}



Mime
View raw message