Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2439E97E0 for ; Sat, 19 Nov 2011 01:32:23 +0000 (UTC) Received: (qmail 25978 invoked by uid 500); 19 Nov 2011 01:32:23 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 25952 invoked by uid 500); 19 Nov 2011 01:32:23 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 25942 invoked by uid 99); 19 Nov 2011 01:32:22 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 19 Nov 2011 01:32:22 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 19 Nov 2011 01:32:20 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 9B51F23889BB; Sat, 19 Nov 2011 01:32:00 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1203943 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/test/java/org/apache/hadoop/hdfs/server/datanode/ Date: Sat, 19 Nov 2011 01:32:00 -0000 To: hdfs-commits@hadoop.apache.org From: todd@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111119013200.9B51F23889BB@eris.apache.org> Author: todd Date: Sat Nov 19 01:31:59 2011 New Revision: 1203943 URL: http://svn.apache.org/viewvc?rev=1203943&view=rev Log: HDFS-2563. Some cleanup in BPOfferService. Contributed by Todd Lipcon. Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1203943&r1=1203942&r2=1203943&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Sat Nov 19 01:31:59 2011 @@ -117,6 +117,8 @@ Release 0.23.1 - UNRELEASED HDFS-2562. Refactor DN configuration variables out of DataNode class (todd) + HDFS-2563. Some cleanup in BPOfferService. (todd) + OPTIMIZATIONS HDFS-2130. Switch default checksum to CRC32C. (todd) Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1203943&r1=1203942&r2=1203943&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Sat Nov 19 01:31:59 2011 @@ -176,6 +176,9 @@ import org.apache.hadoop.util.StringUtil import org.apache.hadoop.util.VersionInfo; import org.mortbay.util.ajax.JSON; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + /********************************************************** * DataNode is a class (and program) that stores a set of @@ -704,8 +707,21 @@ public class DataNode extends Configured @InterfaceAudience.Private static class BPOfferService implements Runnable { final InetSocketAddress nnAddr; - DatanodeRegistration bpRegistration; + + /** + * Information about the namespace that this service + * is registering with. This is assigned after + * the first phase of the handshake. + */ NamespaceInfo bpNSInfo; + + /** + * The registration information for this block pool. + * This is assigned after the second phase of the + * handshake. + */ + DatanodeRegistration bpRegistration; + long lastBlockReport = 0; long lastDeletedReport = 0; @@ -713,7 +729,6 @@ public class DataNode extends Configured private Thread bpThread; private DatanodeProtocol bpNamenode; - private String blockPoolId; private long lastHeartbeat = 0; private volatile boolean initialized = false; private final LinkedList receivedAndDeletedBlockList @@ -726,7 +741,6 @@ public class DataNode extends Configured BPOfferService(InetSocketAddress nnAddr, DataNode dn) { this.dn = dn; - this.bpRegistration = dn.createRegistration(); this.nnAddr = nnAddr; this.dnConf = dn.getDnConf(); } @@ -736,7 +750,7 @@ public class DataNode extends Configured * and has registered with the corresponding namenode * @return true if initialized */ - public boolean initialized() { + public boolean isInitialized() { return initialized; } @@ -745,41 +759,67 @@ public class DataNode extends Configured } public String getBlockPoolId() { - return blockPoolId; + if (bpNSInfo != null) { + return bpNSInfo.getBlockPoolID(); + } else { + LOG.warn("Block pool ID needed, but service not yet registered with NN", + new Exception("trace")); + return null; + } + } + + public NamespaceInfo getNamespaceInfo() { + return bpNSInfo; + } + + public String toString() { + if (bpNSInfo == null) { + // If we haven't yet connected to our NN, we don't yet know our + // own block pool ID. + // If _none_ of the block pools have connected yet, we don't even + // know the storage ID of this DN. + String storageId = dn.getStorageId(); + if (storageId == null || "".equals(storageId)) { + storageId = "unknown"; + } + return "Block pool (storage id " + storageId + + ") connecting to " + nnAddr; + } else { + return "Block pool " + getBlockPoolId() + + " (storage id " + dn.getStorageId() + + ") registered with " + nnAddr; + } } private InetSocketAddress getNNSocketAddress() { return nnAddr; } - void setNamespaceInfo(NamespaceInfo nsinfo) { - bpNSInfo = nsinfo; - this.blockPoolId = nsinfo.getBlockPoolID(); - } - + /** + * Used to inject a spy NN in the unit tests. + */ + @VisibleForTesting void setNameNode(DatanodeProtocol dnProtocol) { - bpNamenode = dnProtocol; + bpNamenode = dnProtocol; } - private NamespaceInfo handshake() throws IOException { - NamespaceInfo nsInfo = new NamespaceInfo(); - while (dn.shouldRun && shouldServiceRun) { + /** + * Perform the first part of the handshake with the NameNode. + * This calls versionRequest to determine the NN's + * namespace and version info. It automatically retries until + * the NN responds or the DN is shutting down. + * + * @return the NamespaceInfo + * @throws IncorrectVersionException if the remote NN does not match + * this DN's version + */ + NamespaceInfo retrieveNamespaceInfo() throws IncorrectVersionException { + NamespaceInfo nsInfo = null; + while (shouldRun()) { try { nsInfo = bpNamenode.versionRequest(); - // verify build version - String nsVer = nsInfo.getBuildVersion(); - String stVer = Storage.getBuildVersion(); - LOG.info("handshake: namespace info = " + nsInfo); - - if(! nsVer.equals(stVer)) { - String errorMsg = "Incompatible build versions: bp = " + blockPoolId + - "namenode BV = " + nsVer + "; datanode BV = " + stVer; - LOG.warn(errorMsg); - bpNamenode.errorReport( bpRegistration, - DatanodeProtocol.NOTIFY, errorMsg ); - } else { - break; - } + LOG.debug(this + " received versionRequest response: " + nsInfo); + break; } catch(SocketTimeoutException e) { // namenode is busy LOG.warn("Problem connecting to server: " + nnAddr); } catch(IOException e ) { // namenode is not available @@ -787,40 +827,53 @@ public class DataNode extends Configured } // try again in a second - try { - Thread.sleep(5000); - } catch (InterruptedException ie) {} + sleepAndLogInterrupts(5000, "requesting version info from NN"); } - assert HdfsConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() : - "Data-node and name-node layout versions must be the same." - + "Expected: "+ HdfsConstants.LAYOUT_VERSION - + " actual "+ nsInfo.getLayoutVersion(); + if (nsInfo != null) { + checkNNVersion(nsInfo); + } return nsInfo; } - void setupBP(Configuration conf) - throws IOException { + private void checkNNVersion(NamespaceInfo nsInfo) + throws IncorrectVersionException { + // build and layout versions should match + String nsBuildVer = nsInfo.getBuildVersion(); + String stBuildVer = Storage.getBuildVersion(); + if (!nsBuildVer.equals(stBuildVer)) { + LOG.warn("Data-node and name-node Build versions must be the same. " + + "Namenode build version: " + nsBuildVer + "Datanode " + + "build version: " + stBuildVer); + throw new IncorrectVersionException(nsBuildVer, "namenode", stBuildVer); + } + + if (HdfsConstants.LAYOUT_VERSION != nsInfo.getLayoutVersion()) { + LOG.warn("Data-node and name-node layout versions must be the same." + + " Expected: "+ HdfsConstants.LAYOUT_VERSION + + " actual "+ bpNSInfo.getLayoutVersion()); + throw new IncorrectVersionException( + bpNSInfo.getLayoutVersion(), "namenode"); + } + } + + private void connectToNNAndHandshake() throws IOException { // get NN proxy - DatanodeProtocol dnp = + bpNamenode = (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class, - DatanodeProtocol.versionID, nnAddr, conf); - setNameNode(dnp); + DatanodeProtocol.versionID, nnAddr, dn.getConf()); - // handshake with NN - NamespaceInfo nsInfo = handshake(); - setNamespaceInfo(nsInfo); - dn.initBlockPool(this, nsInfo); + // First phase of the handshake with NN - get the namespace + // info. + bpNSInfo = retrieveNamespaceInfo(); - bpRegistration.setStorageID(dn.getStorageId()); - StorageInfo storageInfo = dn.storage.getBPStorage(blockPoolId); - if (storageInfo == null) { - // it's null in the case of SimulatedDataSet - bpRegistration.storageInfo.layoutVersion = HdfsConstants.LAYOUT_VERSION; - bpRegistration.setStorageInfo(nsInfo); - } else { - bpRegistration.setStorageInfo(storageInfo); - } + // Now that we know the namespace ID, etc, we can pass this to the DN. + // The DN can now initialize its local storage if we are the + // first BP to handshake, etc. + dn.initBlockPool(this); + + // Second phase of the handshake with the NN. + register(); } /** @@ -875,7 +928,7 @@ public class DataNode extends Configured } } if (receivedAndDeletedBlockArray != null) { - bpNamenode.blockReceivedAndDeleted(bpRegistration, blockPoolId, + bpNamenode.blockReceivedAndDeleted(bpRegistration, getBlockPoolId(), receivedAndDeletedBlockArray); synchronized (receivedAndDeletedBlockList) { for (int i = 0; i < receivedAndDeletedBlockArray.length; i++) { @@ -897,9 +950,9 @@ public class DataNode extends Configured : "delHint is null"); } - if (!block.getBlockPoolId().equals(blockPoolId)) { + if (!block.getBlockPoolId().equals(getBlockPoolId())) { LOG.warn("BlockPool mismatch " + block.getBlockPoolId() + " vs. " - + blockPoolId); + + getBlockPoolId()); return; } @@ -916,9 +969,9 @@ public class DataNode extends Configured throw new IllegalArgumentException("Block is null"); } - if (!block.getBlockPoolId().equals(blockPoolId)) { + if (!block.getBlockPoolId().equals(getBlockPoolId())) { LOG.warn("BlockPool mismatch " + block.getBlockPoolId() + " vs. " - + blockPoolId); + + getBlockPoolId()); return; } @@ -941,11 +994,11 @@ public class DataNode extends Configured // Create block report long brCreateStartTime = now(); - BlockListAsLongs bReport = dn.data.getBlockReport(blockPoolId); + BlockListAsLongs bReport = dn.data.getBlockReport(getBlockPoolId()); // Send block report long brSendStartTime = now(); - cmd = bpNamenode.blockReport(bpRegistration, blockPoolId, bReport + cmd = bpNamenode.blockReport(bpRegistration, getBlockPoolId(), bReport .getBlockListAsLongs()); // Log the block report processing stats from Datanode perspective @@ -982,7 +1035,7 @@ public class DataNode extends Configured dn.data.getCapacity(), dn.data.getDfsUsed(), dn.data.getRemaining(), - dn.data.getBlockPoolUsed(blockPoolId), + dn.data.getBlockPoolUsed(getBlockPoolId()), dn.xmitsInProgress.get(), dn.getXceiverCount(), dn.data.getNumFailedVolumes()); } @@ -1039,7 +1092,7 @@ public class DataNode extends Configured // // Now loop for a long time.... // - while (dn.shouldRun && shouldServiceRun) { + while (shouldRun()) { try { long startTime = now(); @@ -1080,7 +1133,7 @@ public class DataNode extends Configured // Now safe to start scanning the block pool if (dn.blockScanner != null) { - dn.blockScanner.addBlockPool(this.blockPoolId); + dn.blockScanner.addBlockPool(this.getBlockPoolId()); } // @@ -1094,8 +1147,7 @@ public class DataNode extends Configured try { receivedAndDeletedBlockList.wait(waitTime); } catch (InterruptedException ie) { - LOG.warn("BPOfferService for block pool=" - + this.getBlockPoolId() + " received exception:" + ie); + LOG.warn("BPOfferService for " + this + " interrupted"); } } } // synchronized @@ -1104,7 +1156,7 @@ public class DataNode extends Configured if (UnregisteredNodeException.class.getName().equals(reClass) || DisallowedDatanodeException.class.getName().equals(reClass) || IncorrectVersionException.class.getName().equals(reClass)) { - LOG.warn("blockpool " + blockPoolId + " is shutting down", re); + LOG.warn(this + " is shutting down", re); shouldServiceRun = false; return; } @@ -1118,7 +1170,7 @@ public class DataNode extends Configured } catch (IOException e) { LOG.warn("IOException in offerService", e); } - } // while (shouldRun && shouldServiceRun) + } // while (shouldRun()) } // offerService /** @@ -1134,54 +1186,44 @@ public class DataNode extends Configured * @throws IOException */ void register() throws IOException { - LOG.info("in register: sid=" + bpRegistration.getStorageID() + ";SI=" - + bpRegistration.storageInfo); - - // build and layout versions should match - String nsBuildVer = bpNamenode.versionRequest().getBuildVersion(); - String stBuildVer = Storage.getBuildVersion(); + Preconditions.checkState(bpNSInfo != null, + "register() should be called after handshake()"); + + // The handshake() phase loaded the block pool storage + // off disk - so update the bpRegistration object from that info + bpRegistration = dn.createBPRegistration(bpNSInfo); - if (!nsBuildVer.equals(stBuildVer)) { - LOG.warn("Data-node and name-node Build versions must be " + - "the same. Namenode build version: " + nsBuildVer + "Datanode " + - "build version: " + stBuildVer); - throw new IncorrectVersionException(nsBuildVer, "namenode", stBuildVer); - } + LOG.info(this + " beginning handshake with NN"); - if (HdfsConstants.LAYOUT_VERSION != bpNSInfo.getLayoutVersion()) { - LOG.warn("Data-node and name-node layout versions must be " + - "the same. Expected: "+ HdfsConstants.LAYOUT_VERSION + - " actual "+ bpNSInfo.getLayoutVersion()); - throw new IncorrectVersionException - (bpNSInfo.getLayoutVersion(), "namenode"); - } - - while(dn.shouldRun && shouldServiceRun) { + while (shouldRun()) { try { // Use returned registration from namenode with updated machine name. bpRegistration = bpNamenode.registerDatanode(bpRegistration); - - LOG.info("bpReg after =" + bpRegistration.storageInfo + - ";sid=" + bpRegistration.storageID + ";name="+bpRegistration.getName()); - break; } catch(SocketTimeoutException e) { // namenode is busy LOG.info("Problem connecting to server: " + nnAddr); - try { - Thread.sleep(1000); - } catch (InterruptedException ie) {} + sleepAndLogInterrupts(1000, "connecting to server"); } } - dn.bpRegistrationSucceeded(bpRegistration, blockPoolId); - - LOG.info("in register:" + ";bpDNR="+bpRegistration.storageInfo); + LOG.info("Block pool " + this + " successfully registered with NN"); + dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId()); // random short delay - helps scatter the BR from all DNs scheduleBlockReport(dnConf.initialBlockReportDelay); } + private void sleepAndLogInterrupts(int millis, + String stateString) { + try { + Thread.sleep(millis); + } catch (InterruptedException ie) { + LOG.info("BPOfferService " + this + + " interrupted while " + stateString); + } + } + /** * No matter what kind of exception we get, keep retrying to offerService(). * That's the loop that connects to the NameNode and provides basic DataNode @@ -1192,49 +1234,43 @@ public class DataNode extends Configured */ @Override public void run() { - LOG.info(bpRegistration + "In BPOfferService.run, data = " + dn.data - + ";bp=" + blockPoolId); + LOG.info(this + " starting to offer service"); try { // init stuff try { // setup storage - setupBP(dn.conf); - register(); + connectToNNAndHandshake(); } catch (IOException ioe) { // Initial handshake, storage recovery or registration failed // End BPOfferService thread - LOG.fatal(bpRegistration + " initialization failed for block pool " - + blockPoolId, ioe); + LOG.fatal("Initialization failed for block pool " + this, ioe); return; } initialized = true; // bp is initialized; - while (dn.shouldRun && shouldServiceRun) { + while (shouldRun()) { try { startDistributedUpgradeIfNeeded(); offerService(); } catch (Exception ex) { - LOG.error("Exception in BPOfferService", ex); - if (dn.shouldRun && shouldServiceRun) { - try { - Thread.sleep(5000); - } catch (InterruptedException ie) { - LOG.warn("Received exception", ie); - } - } + LOG.error("Exception in BPOfferService for " + this, ex); + sleepAndLogInterrupts(5000, "offering service"); } } } catch (Throwable ex) { - LOG.warn("Unexpected exception", ex); + LOG.warn("Unexpected exception in block pool " + this, ex); } finally { - LOG.warn(bpRegistration + " ending block pool service for: " - + blockPoolId + " thread " + Thread.currentThread().getId()); + LOG.warn("Ending block pool service for: " + this); cleanUp(); } } + private boolean shouldRun() { + return shouldServiceRun && dn.shouldRun(); + } + /** * Process an array of datanode commands * @@ -1299,7 +1335,11 @@ public class DataNode extends Configured case DatanodeProtocol.DNA_REGISTER: // namenode requested a registration - at start or if NN lost contact LOG.info("DatanodeCommand action: DNA_REGISTER"); - if (dn.shouldRun && shouldServiceRun) { + if (shouldRun()) { + // re-retrieve namespace info to make sure that, if the NN + // was restarted, we still match its version (HDFS-2120) + retrieveNamespaceInfo(); + // and re-register register(); } break; @@ -1317,7 +1357,7 @@ public class DataNode extends Configured case DatanodeProtocol.DNA_ACCESSKEYUPDATE: LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE"); if (dn.isBlockTokenEnabled) { - dn.blockPoolTokenSecretManager.setKeys(blockPoolId, + dn.blockPoolTokenSecretManager.setKeys(getBlockPoolId(), ((KeyUpdateCommand) cmd).getExportedKeys()); } break; @@ -1346,7 +1386,7 @@ public class DataNode extends Configured synchronized UpgradeManagerDatanode getUpgradeManager() { if(upgradeManager == null) upgradeManager = - new UpgradeManagerDatanode(dn, blockPoolId); + new UpgradeManagerDatanode(dn, getBlockPoolId()); return upgradeManager; } @@ -1363,6 +1403,7 @@ public class DataNode extends Configured um.startUpgrade(); return; } + } /** @@ -1407,6 +1448,26 @@ public class DataNode extends Configured } /** + * Create a DatanodeRegistration for a specific block pool. + * @param nsInfo the namespace info from the first part of the NN handshake + */ + DatanodeRegistration createBPRegistration(NamespaceInfo nsInfo) { + DatanodeRegistration bpRegistration = createUnknownBPRegistration(); + String blockPoolId = nsInfo.getBlockPoolID(); + + bpRegistration.setStorageID(getStorageId()); + StorageInfo storageInfo = storage.getBPStorage(blockPoolId); + if (storageInfo == null) { + // it's null in the case of SimulatedDataSet + bpRegistration.storageInfo.layoutVersion = HdfsConstants.LAYOUT_VERSION; + bpRegistration.setStorageInfo(nsInfo); + } else { + bpRegistration.setStorageInfo(storageInfo); + } + return bpRegistration; + } + + /** * Check that the registration returned from a NameNode is consistent * with the information in the storage. If the storage is fresh/unformatted, * sets the storage ID based on this registration. @@ -1486,11 +1547,27 @@ public class DataNode extends Configured } } - void initBlockPool(BPOfferService bpOfferService, - NamespaceInfo nsInfo) throws IOException { + /** + * One of the Block Pools has successfully connected to its NN. + * This initializes the local storage for that block pool, + * checks consistency of the NN's cluster ID, etc. + * + * If this is the first block pool to register, this also initializes + * the datanode-scoped storage. + * + * @param nsInfo the handshake response from the NN. + * @throws IOException if the NN is inconsistent with the local storage. + */ + void initBlockPool(BPOfferService bpos) throws IOException { + NamespaceInfo nsInfo = bpos.getNamespaceInfo(); + Preconditions.checkState(nsInfo != null, + "Block pool " + bpos + " should have retrieved " + + "its namespace info before calling initBlockPool."); + String blockPoolId = nsInfo.getBlockPoolID(); - blockPoolManager.addBlockPool(bpOfferService); + // Register the new block pool with the BP manager. + blockPoolManager.addBlockPool(bpos); synchronized (this) { // we do not allow namenode from different cluster to register @@ -1521,12 +1598,21 @@ public class DataNode extends Configured + blockPoolId + ";lv=" + storage.getLayoutVersion() + ";nsInfo=" + nsInfo); } + + // In the case that this is the first block pool to connect, initialize + // the dataset, block scanners, etc. initFsDataSet(); - initPeriodicScanners(conf); - data.addBlockPool(nsInfo.getBlockPoolID(), conf); + initPeriodicScanners(conf); + + data.addBlockPool(blockPoolId, conf); } - private DatanodeRegistration createRegistration() { + /** + * Create a DatanodeRegistration object with no valid StorageInfo. + * This is used when reporting an error during handshake - ie + * before we can load any specific block pool. + */ + private DatanodeRegistration createUnknownBPRegistration() { DatanodeRegistration reg = new DatanodeRegistration(getMachineName()); reg.setInfoPort(infoServer.getPort()); reg.setIpcPort(getIpcPort()); @@ -2554,16 +2640,6 @@ public class DataNode extends Configured return bpos.bpNamenode; } - /** - * To be used by tests only to set a mock namenode in BPOfferService - */ - void setBPNamenode(String bpid, DatanodeProtocol namenode) { - BPOfferService bp = blockPoolManager.get(bpid); - if (bp != null) { - bp.setNameNode(namenode); - } - } - /** Block synchronization */ void syncBlock(RecoveringBlock rBlock, List syncList) throws IOException { @@ -2789,7 +2865,7 @@ public class DataNode extends Configured final Map info = new HashMap(); for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) { if (bpos != null && bpos.bpThread != null) { - info.put(bpos.getNNSocketAddress().getHostName(), bpos.blockPoolId); + info.put(bpos.getNNSocketAddress().getHostName(), bpos.getBlockPoolId()); } } return JSON.toString(info); @@ -2877,7 +2953,7 @@ public class DataNode extends Configured */ public boolean isDatanodeFullyStarted() { for (BPOfferService bp : blockPoolManager.getAllNamenodeThreads()) { - if (!bp.initialized() || !bp.isAlive()) { + if (!bp.isInitialized() || !bp.isAlive()) { return false; } } @@ -2904,4 +2980,8 @@ public class DataNode extends Configured DNConf getDnConf() { return dnConf; } + + boolean shouldRun() { + return shouldRun; + } } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java?rev=1203943&r1=1203942&r2=1203943&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java Sat Nov 19 01:31:59 2011 @@ -21,10 +21,7 @@ package org.apache.hadoop.hdfs.server.da import java.io.IOException; -import org.apache.hadoop.hdfs.server.datanode.DataNode.BPOfferService; -import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; -import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; /** * Utility class for accessing package-private DataNode information during tests. @@ -41,27 +38,4 @@ public class DataNodeTestUtils { return dn.getDNRegistrationForBP(bpid); } - /** - * manually setup datanode to testing - * @param dn - datanode - * @param nsifno - namenode info - * @param bpid - block pool id - * @param nn - namenode object - * @throws IOException - */ - public static void setBPNamenodeByIndex(DataNode dn, - NamespaceInfo nsifno, String bpid, DatanodeProtocol nn) - throws IOException { - // setup the right BPOS.. - BPOfferService [] bposs = dn.getAllBpOs(); - if(bposs.length<0) { - throw new IOException("Datanode wasn't initializes with at least one NN"); - } - for(BPOfferService bpos : bposs) { - bpos.setNamespaceInfo(nsifno); - - dn.setBPNamenode(bpid, nn); - dn.initBlockPool(bpos, nsifno); - } - } } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java?rev=1203943&r1=1203942&r2=1203943&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java Sat Nov 19 01:31:59 2011 @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.da import java.io.File; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.AbstractList; import static org.junit.Assert.fail; @@ -28,29 +29,37 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; +import org.apache.hadoop.hdfs.server.datanode.DataNode.BPOfferService; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.junit.Test; +import org.mockito.Mockito; public class TestDatanodeRegister { public static final Log LOG = LogFactory.getLog(TestDatanodeRegister.class); + + // Invalid address + static final InetSocketAddress INVALID_ADDR = + new InetSocketAddress("127.0.0.1", 1); + @Test public void testDataNodeRegister() throws Exception { - DataNode.BPOfferService myMockBPOS = mock(DataNode.BPOfferService.class); - doCallRealMethod().when(myMockBPOS).register(); - myMockBPOS.bpRegistration = mock(DatanodeRegistration.class); - when(myMockBPOS.bpRegistration.getStorageID()).thenReturn("myTestStorageID"); + DataNode mockDN = mock(DataNode.class); + Mockito.doReturn(true).when(mockDN).shouldRun(); + BPOfferService bpos = new DataNode.BPOfferService(INVALID_ADDR, mockDN); + NamespaceInfo fakeNSInfo = mock(NamespaceInfo.class); when(fakeNSInfo.getBuildVersion()).thenReturn("NSBuildVersion"); DatanodeProtocol fakeDNProt = mock(DatanodeProtocol.class); when(fakeDNProt.versionRequest()).thenReturn(fakeNSInfo); - doCallRealMethod().when(myMockBPOS).setNameNode(fakeDNProt); - myMockBPOS.setNameNode( fakeDNProt ); + + bpos.setNameNode( fakeDNProt ); + bpos.bpNSInfo = fakeNSInfo; try { - myMockBPOS.register(); + bpos.retrieveNamespaceInfo(); fail("register() did not throw exception! " + "Expected: IncorrectVersionException"); } catch (IncorrectVersionException ie) {