Return-Path: Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: (qmail 56136 invoked from network); 1 Mar 2011 00:35:52 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 1 Mar 2011 00:35:52 -0000 Received: (qmail 76073 invoked by uid 500); 1 Mar 2011 00:35:52 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 76009 invoked by uid 500); 1 Mar 2011 00:35:52 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 76001 invoked by uid 99); 1 Mar 2011 00:35:52 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 Mar 2011 00:35:52 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 Mar 2011 00:35:49 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 5350C2388906; Tue, 1 Mar 2011 00:35:29 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1075599 - in /hadoop/hdfs/branches/HDFS-1052: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/java/org/apache/hadoop/hdfs/server/protocol/ src/java/org/apac... Date: Tue, 01 Mar 2011 00:35:29 -0000 To: hdfs-commits@hadoop.apache.org From: jitendra@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110301003529.5350C2388906@eris.apache.org> Author: jitendra Date: Tue Mar 1 00:35:28 2011 New Revision: 1075599 URL: http://svn.apache.org/viewvc?rev=1075599&view=rev Log: Federation: Datanode command to refresh namenode list at the datanode. Added: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeShutdown.java hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java Modified: hadoop/hdfs/branches/HDFS-1052/CHANGES.txt hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSUtil.java hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java Modified: hadoop/hdfs/branches/HDFS-1052/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/CHANGES.txt?rev=1075599&r1=1075598&r2=1075599&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/CHANGES.txt (original) +++ hadoop/hdfs/branches/HDFS-1052/CHANGES.txt Tue Mar 1 00:35:28 2011 @@ -66,6 +66,9 @@ Trunk (unreleased changes) HDFS-1651. Tests fail due to null pointer exception in Datnode#shutdown() method. (Tanping via suresh) + HDFS-1649. Federation: Datanode command to refresh namenode list at + the datanode. (jitendra) + IMPROVEMENTS HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel) Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSUtil.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1075599&r1=1075598&r2=1075599&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSUtil.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSUtil.java Tue Mar 1 00:35:28 2011 @@ -291,17 +291,16 @@ public class DFSUtil { * @return Array of InetSocketAddresses * @throws IOException */ - public static InetSocketAddress[] getNNAddresses(Configuration conf) + public static List getNNAddresses(Configuration conf) throws IOException { List nns = getNamenodeList(conf); if (nns == null) { throw new IOException("Federation namnodes are not configured correctly"); } - InetSocketAddress[] isas = new InetSocketAddress[nns.size()]; - int i = 0; + List isas = new ArrayList(); for (URI u : nns) { - isas[i++] = NameNode.getAddress(u); + isas.add(NameNode.getAddress(u)); } return isas; } Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=1075599&r1=1075598&r2=1075599&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Tue Mar 1 00:35:28 2011 @@ -23,23 +23,35 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector; import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.token.TokenInfo; /** An client-datanode protocol for block recovery */ @InterfaceAudience.Private @InterfaceStability.Evolving +@KerberosInfo( + serverPrincipal = DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY) @TokenInfo(BlockTokenSelector.class) public interface ClientDatanodeProtocol extends VersionedProtocol { public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class); /** - * 7: Add block pool ID to Block + * 8: Add refreshNamenodes method */ - public static final long versionID = 7L; + public static final long versionID = 8L; /** Return the visible length of a replica. */ long getReplicaVisibleLength(ExtendedBlock b) throws IOException; + + /** + * Refresh the list of federated namenodes from updated configuration + * Adds new namenodes and stops the deleted namenodes. + * + * @throws IOException on error + **/ + void refreshNamenodes() throws IOException; } Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1075599&r1=1075598&r2=1075599&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Mar 1 00:35:28 2011 @@ -196,15 +196,128 @@ public class DataNode extends Configured return NetUtils.createSocketAddr(target); } - BPOfferService[] nameNodeThreads; - private Map bpMapping = - new HashMap(); + /** + * Manages he BPOfferService objects for the data node. + * Creation, removal, starting, stopping, shutdown on BPOfferService + * objects must be done via APIs in this class. + */ + @InterfaceAudience.Private + class BlockPoolManager { + private final Map bpMapping; + private final Map nameNodeThreads; + private final DatanodeRegistration dnReg; + + //This lock is used only to ensure exclusion of refreshNamenodes + private final Object refreshNamenodesLock = new Object(); + + BlockPoolManager(Configuration conf, DatanodeRegistration dnReg) + throws IOException { + this.dnReg = dnReg; + bpMapping = new HashMap(); + nameNodeThreads = new HashMap(); + + List isas = DFSUtil.getNNAddresses(conf); + + for(InetSocketAddress isa : isas) { + BPOfferService bpos = new BPOfferService(isa, dnReg); + nameNodeThreads.put(bpos.getNNSocketAddress(), bpos); + } + } + + synchronized void addBlockPool(BPOfferService t) { + if (nameNodeThreads.get(t.getNNSocketAddress()) == null) { + throw new IllegalArgumentException( + "Unknown BPOfferService thread for namenode address:" + + t.getNNSocketAddress()); + } + if (t.getBlockPoolId() == null) { + throw new IllegalArgumentException("Null blockpool id"); + } + bpMapping.put(t.getBlockPoolId(), t); + } + + /** + * Returns the array of BPOfferService objects. + * Caution: The BPOfferService returned could be shutdown any time. + */ + synchronized BPOfferService[] getAllNamenodeThreads() { + BPOfferService[] bposArray = new BPOfferService[nameNodeThreads.values() + .size()]; + return nameNodeThreads.values().toArray(bposArray); + } + + synchronized BPOfferService get(String bpid) { + return bpMapping.get(bpid); + } + + synchronized void remove(BPOfferService t) { + nameNodeThreads.remove(t.getNNSocketAddress()); + bpMapping.remove(t.getBlockPoolId()); + } + + void shutDownAll() throws InterruptedException { + BPOfferService[] bposArray = this.getAllNamenodeThreads(); + for (BPOfferService bpos : bposArray) { + bpos.stop(); + } + } + + synchronized void startAll() { + for (BPOfferService bpos: nameNodeThreads.values()) { + bpos.start(); + } + } + + void joinAll() throws InterruptedException { + for (BPOfferService bpos: this.getAllNamenodeThreads()) { + bpos.join(); + } + } + + void refreshNamenodes(Configuration conf) + throws IOException, InterruptedException { + List newAddresses = DFSUtil.getNNAddresses(conf); + List toShutdown = new ArrayList(); + List toStart = new ArrayList(); + synchronized (refreshNamenodesLock) { + synchronized (this) { + for (InetSocketAddress nnaddr : nameNodeThreads.keySet()) { + if (!(newAddresses.contains(nnaddr))) { + toShutdown.add(nameNodeThreads.get(nnaddr)); + } + } + for (InetSocketAddress nnaddr : newAddresses) { + if (!(nameNodeThreads.containsKey(nnaddr))) { + toStart.add(nnaddr); + } + } + + for (InetSocketAddress nnaddr : toStart) { + BPOfferService bpos = new BPOfferService(nnaddr, dnReg); + nameNodeThreads.put(bpos.getNNSocketAddress(), bpos); + } + + for (BPOfferService bpos : toShutdown) { + remove(bpos); + } + } + + for (BPOfferService bpos : toShutdown) { + bpos.stop(); + } + // Now start the threads that are not already running. + startAll(); + } + } + } + + volatile boolean shouldRun = true; + private BlockPoolManager blockPoolManager; public DatanodeProtocol namenodeTODO_FED = null; //TODO:FEDERATION needs to be taken out. public FSDatasetInterface data = null; public DatanodeRegistration dnRegistration = null; private String clusterId = null; - volatile boolean shouldRun = true; public final static String EMPTY_DEL_HINT = ""; AtomicInteger xmitsInProgress = new AtomicInteger(); Daemon dataXceiverServer = null; @@ -429,7 +542,7 @@ public class DataNode extends Configured // calls specific to BP protected void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) { - BPOfferService bpos = bpMapping.get(block.getPoolId()); + BPOfferService bpos = blockPoolManager.get(block.getPoolId()); if(bpos != null) { bpos.notifyNamenodeReceivedBlock(block, delHint); } else { @@ -439,7 +552,7 @@ public class DataNode extends Configured } public void reportBadBlocks(ExtendedBlock block) throws IOException{ - BPOfferService bpos = bpMapping.get(block.getPoolId()); + BPOfferService bpos = blockPoolManager.get(block.getPoolId()); if(bpos == null || bpos.bpNamenode == null) { throw new IOException("cannot locate OfferService thread for bp="+block.getPoolId()); } @@ -456,7 +569,7 @@ public class DataNode extends Configured * */ class BPOfferService implements Runnable { - final InetSocketAddress nn_addr; + final InetSocketAddress nnAddr; DatanodeRegistration bpRegistration; NamespaceInfo bpNSInfo; long lastBlockReport = 0; @@ -467,10 +580,11 @@ public class DataNode extends Configured private boolean initialized = false; private final LinkedList receivedBlockList = new LinkedList(); private final LinkedList delHints = new LinkedList(); + volatile private boolean shouldServiceRun = true; BPOfferService(InetSocketAddress isa, DatanodeRegistration bpRegistration) { - this.bpRegistration = bpRegistration; - this.nn_addr = isa; + this.bpRegistration = new DatanodeRegistration(bpRegistration); + this.nnAddr = isa; } /** @@ -484,11 +598,15 @@ public class DataNode extends Configured public String getBlockPoolId() { return blockPoolId; } + + private InetSocketAddress getNNSocketAddress() { + return nnAddr; + } void setNamespaceInfo(NamespaceInfo nsinfo) { bpNSInfo = nsinfo; this.blockPoolId = nsinfo.getBlockPoolID(); - bpMapping.put(blockPoolId, this); + blockPoolManager.addBlockPool(this); } void setNameNode(DatanodeProtocol dnProtocol) { @@ -497,12 +615,12 @@ public class DataNode extends Configured private NamespaceInfo handshake() throws IOException { NamespaceInfo nsInfo = new NamespaceInfo(); - while (shouldRun) { + while (shouldRun && shouldServiceRun) { try { nsInfo = bpNamenode.versionRequest(); break; } catch(SocketTimeoutException e) { // namenode is busy - LOG.info("Problem connecting to server: " + nn_addr); + LOG.info("Problem connecting to server: " + nnAddr); try { Thread.sleep(1000); } catch (InterruptedException ie) {} @@ -521,7 +639,7 @@ public class DataNode extends Configured bpNamenode.errorReport( bpRegistration, DatanodeProtocol.NOTIFY, errorMsg ); } catch( SocketTimeoutException e ) { // namenode is busy - LOG.info("Problem connecting to server: " + nn_addr); + LOG.info("Problem connecting to server: " + nnAddr); } throw new IOException( errorMsg ); } @@ -538,8 +656,8 @@ public class DataNode extends Configured // get NN proxy DatanodeProtocol dnp = (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class, - DatanodeProtocol.versionID, nn_addr, conf); - LOG.info("NN proxy created in BP="+blockPoolId + " for " + nn_addr); + DatanodeProtocol.versionID, nnAddr, conf); + LOG.info("NN proxy created in BP="+blockPoolId + " for " + nnAddr); setNameNode(dnp); // handshake with NN @@ -560,7 +678,7 @@ public class DataNode extends Configured bpRegistration.storageInfo.namespaceID = bpNSInfo.namespaceID; bpRegistration.storageInfo.clusterID = bpNSInfo.clusterID; // TODO: FEDERATION - // bpRegistration.storageInfo.blockpoolID = bpNSInfo.blockpoolID; + //bpRegistration.storageInfo.blockpoolID = bpNSInfo.blockpoolID; } else { // read storage info, lock data dirs and transition fs state if necessary storage.recoverTransitionRead(blockPoolId, bpNSInfo, dataDirs, startOpt); @@ -726,14 +844,51 @@ public class DataNode extends Configured xmitsInProgress.get(), getXceiverCount()); } - + + //This must be called only by blockPoolManager + void start() { + if ((bpThread != null) && (bpThread.isAlive())) { + //Thread is started already + return; + } + bpThread = new Thread(this, dnThreadName); + bpThread.setDaemon(true); // needed for JUnit testing + bpThread.start(); + } + + //This must be called only by blockPoolManager. + void stop() { + shouldServiceRun = false; + if (bpThread != null) { + try { + bpThread.interrupt(); + bpThread.join(); + } catch (InterruptedException ex) { + LOG.warn("Received exception: ", ex); + } + } + } + + //This must be called only by blockPoolManager + void join() throws InterruptedException { + if (bpThread != null) { + bpThread.join(); + } + } + + //Cleanup method to be called by current thread before exiting. + private void cleanUp() { + blockPoolManager.remove(this); + shouldServiceRun = false; + RPC.stopProxy(bpNamenode); + } /** * Main loop for each BP thread. Run until shutdown, * forever calling remote NameNode functions. */ private void offerService() throws Exception { - LOG.info("For namenode " + nn_addr + " using BLOCKREPORT_INTERVAL of " + LOG.info("For namenode " + nnAddr + " using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec" + " Initial delay: " + initialBlockReportDelay + "msec" + "; heartBeatInterval=" + heartBeatInterval); @@ -741,7 +896,7 @@ public class DataNode extends Configured // // Now loop for a long time.... // - while (shouldRun) { + while (shouldRun && shouldServiceRun) { try { long startTime = now(); @@ -791,6 +946,8 @@ public class DataNode extends Configured try { receivedBlockList.wait(waitTime); } catch (InterruptedException ie) { + LOG.warn("BPOfferService for block pool=" + + this.getBlockPoolId() + " received exception:" + ie); } } } // synchronized @@ -801,7 +958,6 @@ public class DataNode extends Configured IncorrectVersionException.class.getName().equals(reClass)) { LOG.warn("DataNode is shutting down: " + StringUtils.stringifyException(re)); - shutdown(); // TODO:FEDERATION - ??? what to do here return; } LOG.warn(StringUtils.stringifyException(re)); @@ -813,8 +969,10 @@ public class DataNode extends Configured } } catch (IOException e) { LOG.warn(StringUtils.stringifyException(e)); + } finally { + shouldServiceRun = false; } - } // while (shouldRun) + } // while (shouldRun && shouldServiceRun) } // offerService @@ -835,7 +993,7 @@ public class DataNode extends Configured LOG.info("in register: sid=" + bpRegistration.getStorageID() + ";SI=" + bpRegistration.storageInfo); - while(shouldRun) { + while(shouldRun && shouldServiceRun) { try { // reset name to machineName. Mainly for web interface. Same for all DB bpRegistration.name = machineName + ":" + bpRegistration.getPort(); @@ -854,7 +1012,7 @@ public class DataNode extends Configured break; } catch(SocketTimeoutException e) { // namenode is busy - LOG.info("Problem connecting to server: " + nn_addr); + LOG.info("Problem connecting to server: " + nnAddr); try { Thread.sleep(1000); } catch (InterruptedException ie) {} @@ -913,44 +1071,46 @@ public class DataNode extends Configured * That's the loop that connects to the NameNode and provides basic DataNode * functionality. * - * Only stop when "shouldRun" is turned off (which can only happen at shutdown). + * Only stop when "shouldRun" or "shouldServiceRun" is turned off, which can + * happen either at shutdown or due to refreshNamenodes. */ public void run() { - LOG.info(bpRegistration + "In BPOfferService.run, data = " + data + - ";bp="+blockPoolId); + LOG.info(bpRegistration + "In BPOfferService.run, data = " + data + + ";bp=" + blockPoolId); - //init stuff try { - // setup storage - setupBP(conf, dataDirs); - register(); - } catch (IOException ioe) { - LOG.error(bpRegistration + ": Setup failed", ioe); - // TODO:FEDERATION should be local only - //shutdown(); - return; - } + // init stuff + try { + // setup storage + setupBP(conf, dataDirs); + register(); + } catch (IOException ioe) { + LOG.error(bpRegistration + ": Setup failed", ioe); + return; + } - initialized = true; // bp is initialized; + initialized = true; // bp is initialized; - while (shouldRun) { - try { - // TODO:FEDERATION needs to be moved too - startDistributedUpgradeIfNeeded(); - offerService(); - } catch (Exception ex) { - LOG.error("Exception: " + StringUtils.stringifyException(ex)); - if (shouldRun) { - try { - Thread.sleep(5000); - } catch (InterruptedException ie) { + while (shouldRun && shouldServiceRun) { + try { + // TODO:FEDERATION needs to be moved too + startDistributedUpgradeIfNeeded(); + offerService(); + } catch (Exception ex) { + LOG.error("Exception: " + StringUtils.stringifyException(ex)); + if (shouldRun && shouldServiceRun) { + try { + Thread.sleep(5000); + } catch (InterruptedException ie) { + LOG.warn("Received exception: ", ie); + } } } } + } finally { + LOG.info(dnRegistration + ":Finishing DataNode in: " + data); + cleanUp(); } - - LOG.info(dnRegistration + ":Finishing DataNode in: "+data); - shutdown(); } /** @@ -1012,12 +1172,12 @@ public class DataNode extends Configured break; case DatanodeProtocol.DNA_SHUTDOWN: // shut down the data node - shutdown(); //TODO:FEDERATION - we should not shutdown the whole datanode. + shouldServiceRun = false; return false; case DatanodeProtocol.DNA_REGISTER: // namenode requested a registration - at start or if NN lost contact LOG.info("DatanodeCommand action: DNA_REGISTER"); - if (shouldRun) { + if (shouldRun && shouldServiceRun) { register(); } break; @@ -1083,8 +1243,11 @@ public class DataNode extends Configured myMetrics = new DataNodeMetrics(conf, dnRegistration.getName()); - // get all the NNs configured - nameNodeThreads = getAllNamenodes(conf); + blockPoolManager = new BlockPoolManager(conf, dnRegistration); + } + + BPOfferService[] getAllBpOs() { + return blockPoolManager.getAllNamenodeThreads(); } /** @@ -1138,29 +1301,6 @@ public class DataNode extends Configured // used until it is initialized in register(). this.blockTokenSecretManager = new BlockTokenSecretManager(false, 0, 0); } - - /** - * for each namenode create an offerservice object - * Threads will be started later (out of DataNode constructor) - * @param conf - * @throws IOException - */ - private BPOfferService[] getAllNamenodes(Configuration conf) - throws IOException { - if(nameNodeThreads != null) - return nameNodeThreads; // already initialized - - // get NNs addresses from the configuration - InetSocketAddress[] isas = DFSUtil.getNNAddresses(conf); - - AbstractList al = new ArrayList (isas.length); - for(InetSocketAddress isa : isas) { - BPOfferService bpos = new BPOfferService(isa, dnRegistration); - al.add(bpos); - } - nameNodeThreads = new BPOfferService[isas.length]; - return al.toArray(nameNodeThreads); - } /** * Determine the http server's effective addr @@ -1183,7 +1323,7 @@ public class DataNode extends Configured public DatanodeRegistration getDNRegistrationForBP(String bpid) throws IOException { - BPOfferService bpos = bpMapping.get(bpid); + BPOfferService bpos = blockPoolManager.get(bpid); if(bpos==null || bpos.bpRegistration==null) { throw new IOException("cannot find BPOfferService for bpid="+bpid); } @@ -1312,6 +1452,7 @@ public class DataNode extends Configured if (ipcServer != null) { ipcServer.stop(); } + this.shouldRun = false; if (dataXceiverServer != null) { ((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill(); @@ -1343,21 +1484,10 @@ public class DataNode extends Configured } } - // interrupt all the threads, let them discover that shouldRun is false now - for(BPOfferService bpos : nameNodeThreads) { - if(bpos != null && bpos.bpThread!=null) { - bpos.bpThread.interrupt(); - RPC.stopProxy(bpos.bpNamenode); // stop the RPC threads - } - } - - // wait until the bp threads are done. - for(BPOfferService bpos : nameNodeThreads) { - if(bpos != null && bpos.bpThread!=null) { - try { - bpos.bpThread.join(); - } catch (InterruptedException ignored) {} - } + try { + this.blockPoolManager.shutDownAll(); + } catch (InterruptedException ie) { + LOG.warn("Received exception in BlockPoolManager#shutDownAll: ", ie); } if(upgradeManager != null) @@ -1424,7 +1554,7 @@ public class DataNode extends Configured dp_error = DatanodeProtocol.FATAL_DISK_ERROR; } //inform NameNodes - for(BPOfferService bpos: nameNodeThreads) { + for(BPOfferService bpos: blockPoolManager.getAllNamenodeThreads()) { DatanodeProtocol nn = bpos.bpNamenode; try { nn.errorReport(bpos.bpRegistration, dp_error, errMsgr); @@ -1437,7 +1567,7 @@ public class DataNode extends Configured } LOG.warn("DataNode is shutting down.\n" + errMsgr); - shouldRun = false; + shutdown(); } /** Number of concurrent xceivers per node. */ @@ -1690,7 +1820,7 @@ public class DataNode extends Configured */ void closeBlock(ExtendedBlock block, String delHint) { myMetrics.blocksWritten.inc(); - BPOfferService bpos = bpMapping.get(block.getPoolId()); + BPOfferService bpos = blockPoolManager.get(block.getPoolId()); if(bpos != null) { bpos.notifyNamenodeReceivedBlock(block, delHint); } else { @@ -1706,14 +1836,7 @@ public class DataNode extends Configured * If this thread is specifically interrupted, it will stop waiting. */ public void runDatanodeDaemon() throws IOException { - if (nameNodeThreads != null) { - // Start namenode threads - for(BPOfferService bp : nameNodeThreads) { - bp.bpThread = new Thread(bp, dnThreadName); - bp.bpThread.setDaemon(true); // needed for JUnit testing - bp.bpThread.start(); - } - } + blockPoolManager.startAll(); // start dataXceiveServer dataXceiverServer.start(); @@ -1796,12 +1919,12 @@ public class DataNode extends Configured } void join() { - // TODO:FEDERATION do not ignore InterruptedException - for(BPOfferService bpos : nameNodeThreads) { - if(bpos.bpThread != null) - try { - bpos.bpThread.join(); - } catch (InterruptedException e) {} + while (shouldRun) { + try { + blockPoolManager.joinAll(); + } catch (InterruptedException ex) { + LOG.warn("Received exception in Datanode#join: " + ex); + } } } @@ -1913,7 +2036,7 @@ public class DataNode extends Configured * the block report at the next heartbeat. */ public void scheduleAllBlockReport(long delay) { - for(BPOfferService bpos : nameNodeThreads) { + for(BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) { bpos.scheduleBlockReport(delay); } } @@ -2079,7 +2202,7 @@ public class DataNode extends Configured * @throws IOException */ public DatanodeProtocol getBPNamenode(String bpid) throws IOException { - BPOfferService bpos = bpMapping.get(bpid); + BPOfferService bpos = blockPoolManager.get(bpid); if(bpos == null || bpos.bpNamenode == null) { throw new IOException("cannot find a namnode proxy for bpid=" + bpid); } @@ -2279,5 +2402,20 @@ public class DataNode extends Configured public String getClusterId() { return clusterId; } + + void refreshNamenodes(Configuration conf) throws IOException { + try { + blockPoolManager.refreshNamenodes(conf); + } catch (InterruptedException ex) { + IOException eio = new IOException(); + eio.initCause(ex); + throw eio; + } + } + @Override //ClientDatanodeProtocol + public void refreshNamenodes() throws IOException { + conf = new Configuration(); + refreshNamenodes(conf); + } } Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java?rev=1075599&r1=1075598&r2=1075599&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java Tue Mar 1 00:35:28 2011 @@ -61,6 +61,15 @@ implements Writable, NodeRegistration { } /** + * Copy constructor + */ + public DatanodeRegistration(DatanodeRegistration from) { + super(from); + this.storageInfo = new StorageInfo(); + this.exportedKeys = new ExportedBlockKeys(); + } + + /** * Create DatanodeRegistration */ public DatanodeRegistration(String nodeName) { Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java?rev=1075599&r1=1075598&r2=1075599&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java Tue Mar 1 00:35:28 2011 @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.tools; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -36,6 +37,7 @@ import org.apache.hadoop.fs.shell.Comman import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.FSConstants; import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType; @@ -479,6 +481,7 @@ public class DFSAdmin extends FsShell { "\t[-refreshUserToGroupsMappings]\n" + "\t[refreshSuperUserGroupsConfiguration]\n" + "\t[-printTopology]\n" + + "\t[-refreshNamenodes datanodehost:port]\n"+ "\t[-help [cmd]]\n"; String report ="-report: \tReports basic filesystem information and statistics.\n"; @@ -541,6 +544,11 @@ public class DFSAdmin extends FsShell { String printTopology = "-printTopology: Print a tree of the racks and their\n" + "\t\tnodes as reported by the Namenode\n"; + String refreshNamenodes = "-refreshNamenodes: Takes a datanodehost:port as argument,\n"+ + "\t\tFor the given datanode, reloads the configuration files,\n" + + "\t\tstops serving the removed block-pools\n"+ + "\t\tand starts serving new block-pools\n"; + String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" + "\t\tis specified.\n"; @@ -576,6 +584,8 @@ public class DFSAdmin extends FsShell { System.out.println(refreshSuperUserGroupsConfiguration); } else if ("printTopology".equals(cmd)) { System.out.println(printTopology); + } else if ("refreshNamenodes".equals(cmd)) { + System.out.println(refreshNamenodes); } else if ("help".equals(cmd)) { System.out.println(help); } else { @@ -596,6 +606,7 @@ public class DFSAdmin extends FsShell { System.out.println(refreshUserToGroupsMappings); System.out.println(refreshSuperUserGroupsConfiguration); System.out.println(printTopology); + System.out.println(refreshNamenodes); System.out.println(help); System.out.println(); ToolRunner.printGenericCommandUsage(System.out); @@ -875,6 +886,9 @@ public class DFSAdmin extends FsShell { } else if ("-printTopology".equals(cmd)) { System.err.println("Usage: java DFSAdmin" + " [-printTopology]"); + } else if ("-refreshNamenodes".equals(cmd)) { + System.err.println("Usage: java DFSAdmin" + + " [-refreshNamenodes datanode-host:port]"); } else { System.err.println("Usage: java DFSAdmin"); System.err.println(" [-report]"); @@ -889,6 +903,7 @@ public class DFSAdmin extends FsShell { System.err.println(" [-refreshUserToGroupsMappings]"); System.err.println(" [-refreshSuperUserGroupsConfiguration]"); System.err.println(" [-printTopology]"); + System.err.println(" [-refreshNamenodes datanodehost:port]"); System.err.println(" ["+SetQuotaCommand.USAGE+"]"); System.err.println(" ["+ClearQuotaCommand.USAGE+"]"); System.err.println(" ["+SetSpaceQuotaCommand.USAGE+"]"); @@ -974,6 +989,11 @@ public class DFSAdmin extends FsShell { printUsage(cmd); return exitCode; } + } else if ("-refreshNamenodes".equals(cmd)) { + if (argv.length != 2) { + printUsage(cmd); + return exitCode; + } } // initialize DFSAdmin @@ -1022,6 +1042,8 @@ public class DFSAdmin extends FsShell { exitCode = refreshSuperUserGroupsConfiguration(); } else if ("-printTopology".equals(cmd)) { exitCode = printTopology(); + } else if ("-refreshNamenodes".equals(cmd)) { + exitCode = refreshNamenodes(argv, i); } else if ("-help".equals(cmd)) { if (i < argv.length) { printHelp(argv[i]); @@ -1059,6 +1081,38 @@ public class DFSAdmin extends FsShell { return exitCode; } + private int refreshNamenodes(String[] argv, int i) throws IOException { + String datanode = argv[i]; + + int colonIndex = datanode.indexOf(':'); + String datanodeHostname = datanode.substring(0, colonIndex); + String portString = datanode.substring(colonIndex+1); + int port = Integer.valueOf(portString).intValue(); + + InetSocketAddress datanodeAddr = new InetSocketAddress(datanodeHostname, + port); + + // Get the current configuration + Configuration conf = getConf(); + + // for security authorization + // server principal for this call + // should be NN's one. + conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY, + conf.get(DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY, "")); + + // Create the client + ClientDatanodeProtocol refreshProtocol = (ClientDatanodeProtocol) RPC + .getProxy(ClientDatanodeProtocol.class, + ClientDatanodeProtocol.versionID, datanodeAddr, getUGI(), conf, + NetUtils.getSocketFactory(conf, ClientDatanodeProtocol.class)); + + // Refresh the authorization policy in-effect + refreshProtocol.refreshNamenodes(); + + return 0; + } + /** * main() has some simple utility methods. * @param argv Command line parameters. Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java?rev=1075599&r1=1075598&r2=1075599&view=diff ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java (original) +++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java Tue Mar 1 00:35:28 2011 @@ -79,7 +79,6 @@ public class TestDataNodeMultipleRegistr false); // Setup the NameNode configuration - conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, localHost + ":0"); if (manageNameDfsDirs) { String name = fileAsURI(new File(base_dir, "name1")) + "," + fileAsURI(new File(base_dir, "name2")); @@ -148,11 +147,13 @@ public class TestDataNodeMultipleRegistr int nnPort = 9928; String nnURL1 = "hdfs://" + localHost + ":" + Integer.toString(nnPort); FileSystem.setDefaultUri(conf, nnURL1); + conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, localHost + ":50070"); nn1 = startNameNode(conf, nnPort); nnPort = 9929; String nnURL2 = "hdfs://" + localHost + ":" + Integer.toString(nnPort); FileSystem.setDefaultUri(conf, nnURL2); + conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, localHost + ":50071"); nn2 = startNameNode(conf, nnPort); Assert.assertNotNull("cannot create nn1", nn1); @@ -191,17 +192,24 @@ public class TestDataNodeMultipleRegistr Assert.assertEquals("number of volumes is wrong",2, volInfos.size()); - for (BPOfferService bpos : dn.nameNodeThreads) { + for (BPOfferService bpos : dn.getAllBpOs()) { LOG.info("reg: bpid=" + "; name=" + bpos.bpRegistration.name - + "; sid=" + bpos.bpRegistration.storageID + "; nna=" + bpos.nn_addr); + + "; sid=" + bpos.bpRegistration.storageID + "; nna=" + bpos.nnAddr); } - BPOfferService bpos1 = dn.nameNodeThreads[0]; - BPOfferService bpos2 = dn.nameNodeThreads[1]; + BPOfferService bpos1 = dn.getAllBpOs()[0]; + BPOfferService bpos2 = dn.getAllBpOs()[1]; + + //The order of bpos is not guaranteed, so fix the order + if (bpos1.nnAddr.equals(nn2.getNameNodeAddress())) { + BPOfferService tmp = bpos1; + bpos1 = bpos2; + bpos2 = tmp; + } - Assert.assertEquals("wrong nn address", bpos1.nn_addr, nn1 + Assert.assertEquals("wrong nn address", bpos1.nnAddr, nn1 .getNameNodeAddress()); - Assert.assertEquals("wrong nn address", bpos2.nn_addr, nn2 + Assert.assertEquals("wrong nn address", bpos2.nnAddr, nn2 .getNameNodeAddress()); Assert.assertEquals("wrong bpid", bpos1.getBlockPoolId(), bpid1); Assert.assertEquals("wrong bpid", bpos2.getBlockPoolId(), bpid2); @@ -232,6 +240,7 @@ public class TestDataNodeMultipleRegistr String nnURL = "hdfs://" + localHost + ":" + Integer.toString(nnPort); FileSystem.setDefaultUri(conf, nnURL); + conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, localHost + ":50070"); nn1 = startNameNode(conf, nnPort); Assert.assertNotNull("cannot create nn1", nn1); @@ -260,19 +269,19 @@ public class TestDataNodeMultipleRegistr Assert.assertEquals("number of volumes is wrong",2, volInfos.size()); - for (BPOfferService bpos : dn.nameNodeThreads) { - LOG.debug("reg: bpid=" + "; name=" + bpos.bpRegistration.name - + "; sid=" + bpos.bpRegistration.storageID + "; nna=" + bpos.nn_addr); + for (BPOfferService bpos : dn.getAllBpOs()) { + LOG.debug("reg: bpid=" + "; name=" + bpos.bpRegistration.name + "; sid=" + + bpos.bpRegistration.storageID + "; nna=" + bpos.nnAddr); } - + // try block report - BPOfferService bpos1 = dn.nameNodeThreads[0]; + BPOfferService bpos1 = dn.getAllBpOs()[0]; bpos1.lastBlockReport = 0; DatanodeCommand cmd = bpos1.blockReport(); Assert.assertNotNull("cmd is null", cmd); - Assert.assertEquals("wrong nn address", bpos1.nn_addr, nn1 + Assert.assertEquals("wrong nn address", bpos1.nnAddr, nn1 .getNameNodeAddress()); Assert.assertEquals("wrong bpid", bpos1.getBlockPoolId(), bpid1); Assert.assertEquals("wrong cid", dn.getClusterId(), cid1); @@ -283,7 +292,7 @@ public class TestDataNodeMultipleRegistr nn1 = null; } - private void shutdownNN(NameNode nn) { + void shutdownNN(NameNode nn) { if (nn == null) { return; } @@ -292,8 +301,8 @@ public class TestDataNodeMultipleRegistr } public boolean isDnUp(DataNode dn) { - boolean up = dn.nameNodeThreads.length > 0; - for (BPOfferService bpos : dn.nameNodeThreads) { + boolean up = dn.getAllBpOs().length > 0; + for (BPOfferService bpos : dn.getAllBpOs()) { up = up && bpos.initialized(); } return up; Added: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeShutdown.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeShutdown.java?rev=1075599&view=auto ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeShutdown.java (added) +++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeShutdown.java Tue Mar 1 00:35:28 2011 @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestDataNodeShutdown { + + private String localhost = "127.0.0.1"; + private int nnPort1 = 2231; + private int nnPort2 = 2232; + private final String nnURL1 = "hdfs://" + localhost + ":" + nnPort1; + private final String nnURL2 = "hdfs://" + localhost + ":" + nnPort2; + private NameNode nn1 = null; + private NameNode nn2 = null; + private TestDataNodeMultipleRegistrations tdnmr = null; + + @Before + public void setUp() throws Exception { + tdnmr = new TestDataNodeMultipleRegistrations(); + tdnmr.setUp(); + } + + private void startNamenodes() throws IOException { + Configuration conf = new HdfsConfiguration(); + conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "127.0.0.1:0"); + + conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:50071"); + FileSystem.setDefaultUri(conf, nnURL1); + nn1 = tdnmr.startNameNode(conf, nnPort1); + + conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:50072"); + FileSystem.setDefaultUri(conf, nnURL2); + nn2 = tdnmr.startNameNode(conf, nnPort2); + } + + @Test + public void testDataNodeShutdown() throws IOException { + Configuration conf = new Configuration(); + + conf.set(DFSConfigKeys.DFS_FEDERATION_NAMENODES, nnURL1 +","+ nnURL2); + startNamenodes(); + + DataNode dn = tdnmr.startDataNode(conf); + tdnmr.waitDataNodeUp(dn); + + //shutdown datanode + dn.shutdown(); + + Assert.assertEquals(0, dn.getAllBpOs().length); + + //Restart datanode + dn = tdnmr.startDataNode(conf); + tdnmr.waitDataNodeUp(dn); + Assert.assertEquals(2, dn.getAllBpOs().length); + + dn.shutdown(); + } + + @After + public void tearDown() throws Exception { + tdnmr.shutdownNN(nn1); + tdnmr.shutdownNN(nn2); + } + +} Added: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java?rev=1075599&view=auto ============================================================================== --- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java (added) +++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java Tue Mar 1 00:35:28 2011 @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.datanode.DataNode.BPOfferService; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.junit.Before; +import org.junit.Test; + + +public class TestRefreshNamenodes { + + private String localhost = "127.0.0.1"; + private int nnPort1 = 2221; + private int nnPort2 = 2222; + private int nnPort3 = 2223; + private int nnPort4 = 2224; + private final String nnURL1 = "hdfs://" + localhost + ":" + nnPort1; + private final String nnURL2 = "hdfs://" + localhost + ":" + nnPort2; + private final String nnURL3 = "hdfs://" + localhost + ":" + nnPort3; + private final String nnURL4 = "hdfs://" + localhost + ":" + nnPort4; + private NameNode nn1 = null; + private NameNode nn2 = null; + private NameNode nn3 = null; + private NameNode nn4 = null; + private TestDataNodeMultipleRegistrations tdnmr = null; + + @Before + public void setUp() throws Exception { + tdnmr = new TestDataNodeMultipleRegistrations(); + tdnmr.setUp(); + } + + private void startNamenodes() throws IOException { + Configuration conf = new HdfsConfiguration(); + conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "127.0.0.1:0"); + + conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:50071"); + FileSystem.setDefaultUri(conf, nnURL1); + nn1 = tdnmr.startNameNode(conf, nnPort1); + + conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:50072"); + FileSystem.setDefaultUri(conf, nnURL2); + nn2 = tdnmr.startNameNode(conf, nnPort2); + + conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:50073"); + FileSystem.setDefaultUri(conf, nnURL3); + nn3 = tdnmr.startNameNode(conf, nnPort3); + + conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:50074"); + FileSystem.setDefaultUri(conf, nnURL4); + nn4 = tdnmr.startNameNode(conf, nnPort4); + } + + @Test + public void testRefreshNamenodes() throws IOException { + Configuration conf = new Configuration(); + + conf.set(DFSConfigKeys.DFS_FEDERATION_NAMENODES, nnURL1 +","+ nnURL2); + startNamenodes(); + + DataNode dn = tdnmr.startDataNode(conf); + tdnmr.waitDataNodeUp(dn); + + assertEquals(2, dn.getAllBpOs().length); + conf.set(DFSConfigKeys.DFS_FEDERATION_NAMENODES, nnURL1 + "," + nnURL3 + + "," + nnURL4); + dn.refreshNamenodes(conf); + tdnmr.waitDataNodeUp(dn); + BPOfferService[] bpoList = dn.getAllBpOs(); + assertEquals(3, bpoList.length); + + InetSocketAddress nn_addr_1 = bpoList[0].nnAddr; + InetSocketAddress nn_addr_2 = bpoList[1].nnAddr; + InetSocketAddress nn_addr_3 = bpoList[2].nnAddr; + + assertTrue(nn_addr_1.equals(nn1.getNameNodeAddress())); + assertTrue(nn_addr_2.equals(nn3.getNameNodeAddress())); + assertTrue(nn_addr_3.equals(nn4.getNameNodeAddress())); + + dn.shutdown(); + tdnmr.shutdownNN(nn1); + tdnmr.shutdownNN(nn2); + tdnmr.shutdownNN(nn3); + tdnmr.shutdownNN(nn4); + } +}