Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 13867 invoked from network); 7 Jul 2008 23:43:29 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 7 Jul 2008 23:43:29 -0000 Received: (qmail 10995 invoked by uid 500); 7 Jul 2008 23:43:29 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 10963 invoked by uid 500); 7 Jul 2008 23:43:29 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 10954 invoked by uid 99); 7 Jul 2008 23:43:29 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 Jul 2008 16:43:29 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 Jul 2008 23:42:39 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 254DB2388A23; Mon, 7 Jul 2008 16:42:31 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r674675 - in /hadoop/core/branches/branch-0.18: CHANGES.txt src/hdfs/org/apache/hadoop/dfs/DatanodeDescriptor.java src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java src/hdfs/org/apache/hadoop/dfs/NameNode.java Date: Mon, 07 Jul 2008 23:42:30 -0000 To: core-commits@hadoop.apache.org From: shv@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080707234231.254DB2388A23@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: shv Date: Mon Jul 7 16:42:30 2008 New Revision: 674675 URL: http://svn.apache.org/viewvc?rev=674675&view=rev Log: Revert changes for revision 674657-674658 related to HADOOP-3002. Modified: hadoop/core/branches/branch-0.18/CHANGES.txt hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DatanodeDescriptor.java hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/NameNode.java Modified: hadoop/core/branches/branch-0.18/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/CHANGES.txt?rev=674675&r1=674674&r2=674675&view=diff ============================================================================== --- hadoop/core/branches/branch-0.18/CHANGES.txt (original) +++ hadoop/core/branches/branch-0.18/CHANGES.txt Mon Jul 7 16:42:30 2008 @@ -755,8 +755,6 @@ HADOOP-3633. Correct exception handling in DataXceiveServer, and throttle the number of xceiver threads in a data-node. (shv) - HADOOP-3002. HDFS should not remove blocks while in safemode. (shv) - Release 0.17.0 - 2008-05-18 INCOMPATIBLE CHANGES Modified: hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DatanodeDescriptor.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DatanodeDescriptor.java?rev=674675&r1=674674&r2=674675&view=diff ============================================================================== --- hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DatanodeDescriptor.java (original) +++ hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DatanodeDescriptor.java Mon Jul 7 16:42:30 2008 @@ -344,8 +344,7 @@ void reportDiff(BlocksMap blocksMap, BlockListAsLongs newReport, Collection toAdd, - Collection toRemove, - Collection toInvalidate) { + Collection toRemove) { // place a deilimiter in the list which separates blocks // that have been reported from those that have not BlockInfo delimiter = new BlockInfo(new Block(), 1); @@ -361,9 +360,8 @@ iblk.set(newReport.getBlockId(i), newReport.getBlockLen(i), newReport.getBlockGenStamp(i)); BlockInfo storedBlock = blocksMap.getStoredBlock(iblk); - if(storedBlock == null) { - // If block is not in blocksMap it does not belong to any file - toInvalidate.add(new Block(iblk)); + if(storedBlock == null) { // Brand new block + toAdd.add(new Block(iblk)); continue; } if(storedBlock.findDatanode(this) < 0) {// Known block, but not on the DN Modified: hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java?rev=674675&r1=674674&r2=674675&view=diff ============================================================================== --- hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java (original) +++ hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java Mon Jul 7 16:42:30 2008 @@ -2085,36 +2085,37 @@ nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount); updateStats(nodeinfo, true); - // If the datanode has (just) been resolved and we haven't ever processed - // a block report from it yet, ask for one now. - if (!blockReportProcessed(nodeReg)) { - // If we never processed a block report from this datanode, we shouldn't - // have any work for that as well - assert(cmd == null); - if (isResolved(nodeReg)) { - return DatanodeCommand.BLOCKREPORT; - } - } - - if (isInSafeMode()) { - //check distributed upgrade - return getDistributedUpgradeCommand(); - } - - // All other commands are not allowed in safe mode. //check lease recovery - cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE); - if (cmd != null) - return cmd; + if (cmd == null) { + cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE); + } //check pending replication - cmd = nodeinfo.getReplicationCommand( - maxReplicationStreams - xmitsInProgress); - if (cmd != null) - return cmd; + if (cmd == null) { + cmd = nodeinfo.getReplicationCommand( + maxReplicationStreams - xmitsInProgress); + } //check block invalidation - return nodeinfo.getInvalidateBlocks(blockInvalidateLimit); + if (cmd == null) { + cmd = nodeinfo.getInvalidateBlocks(blockInvalidateLimit); + } } } + + // If the datanode has (just) been resolved and we haven't ever processed + // a block report from it yet, ask for one now. + if (!blockReportProcessed(nodeReg)) { + // If we never processed a block report from this datanode, we shouldn't + // have any work for that as well + assert(cmd == null); + if (isResolved(nodeReg)) { + return DatanodeCommand.BLOCKREPORT; + } + } + //check distributed upgrade + if (cmd == null) { + cmd = getDistributedUpgradeCommand(); + } + return cmd; } private void updateStats(DatanodeDescriptor node, boolean isAdded) { @@ -2197,9 +2198,6 @@ int workFound = 0; int blocksToProcess = 0; int nodesToProcess = 0; - // blocks should not be replicated or removed if safe mode is on - if (isInSafeMode()) - return workFound; synchronized(heartbeats) { blocksToProcess = (int)(heartbeats.size() * ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION); @@ -2242,6 +2240,9 @@ private synchronized int computeReplicationWork( int blocksToProcess) throws IOException { int scheduledReplicationCount = 0; + // blocks should not be replicated or removed if safe mode is on + if (isInSafeMode()) + return scheduledReplicationCount; synchronized(neededReplications) { // # of blocks to process equals either twice the number of live @@ -2617,9 +2618,9 @@ * The given node is reporting all its blocks. Use this info to * update the (machine-->blocklist) and (block-->machinelist) tables. */ - public synchronized void processReport(DatanodeID nodeID, - BlockListAsLongs newReport - ) throws IOException { + public synchronized Block[] processReport(DatanodeID nodeID, + BlockListAsLongs newReport + ) throws IOException { long startTime = now(); if (NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: " @@ -2641,7 +2642,7 @@ if (node.getNetworkLocation().equals(NetworkTopology.UNRESOLVED)) { LOG.info("Ignoring block report from " + nodeID.getName() + " because rack location for this datanode is still to be resolved."); - return; //drop the block report if the dn hasn't been resolved + return null; //drop the block report if the dn hasn't been resolved } node.setBlockReportProcessed(true); @@ -2651,8 +2652,7 @@ // Collection toAdd = new LinkedList(); Collection toRemove = new LinkedList(); - Collection toInvalidate = new LinkedList(); - node.reportDiff(blocksMap, newReport, toAdd, toRemove, toInvalidate); + node.reportDiff(blocksMap, newReport, toAdd, toRemove); for (Block b : toRemove) { removeStoredBlock(b, node); @@ -2660,13 +2660,41 @@ for (Block b : toAdd) { addStoredBlock(b, node, null); } - for (Block b : toInvalidate) { - NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: block " - + b + " on " + node.getName() + " size " + b.getNumBytes() - + " does not belong to any file."); - addToInvalidates(b, node); + + // + // We've now completely updated the node's block report profile. + // We now go through all its blocks and find which ones are invalid, + // no longer pending, or over-replicated. + // + // (Note it's not enough to just invalidate blocks at lease expiry + // time; datanodes can go down before the client's lease on + // the failed file expires and miss the "expire" event.) + // + // This function considers every block on a datanode, and thus + // should only be invoked infrequently. + // + Collection obsolete = new ArrayList(); + for (Iterator it = node.getBlockIterator(); it.hasNext();) { + Block b = it.next(); + + // + // A block report can only send BLOCK_INVALIDATE_CHUNK number of + // blocks to be deleted. If there are more blocks to be deleted, + // they are added to recentInvalidateSets and will be sent out + // thorugh succeeding heartbeat responses. + // + if (!isValidBlock(b)) { + if (obsolete.size() > blockInvalidateLimit) { + addToInvalidates(b, node); + } else { + obsolete.add(b); + } + NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: " + +"ask "+nodeID.getName()+" to delete "+b); + } } NameNode.getNameNodeMetrics().blockReport.inc((int) (now() - startTime)); + return obsolete.toArray(new Block[obsolete.size()]); } /** @@ -2678,6 +2706,7 @@ DatanodeDescriptor node, DatanodeDescriptor delNodeHint) { BlockInfo storedBlock = blocksMap.getStoredBlock(block); + INodeFile fileINode = null; boolean added = false; if(storedBlock == null) { // block is not in the blocksMaps // add block to the blocksMap and to the data-node @@ -2689,6 +2718,7 @@ } assert storedBlock != null : "Block must be stored by now"; + fileINode = storedBlock.getINode(); if (block != storedBlock) { if (block.getNumBytes() > 0) { long cursize = storedBlock.getNumBytes(); @@ -2763,8 +2793,17 @@ + block + " on " + node.getName() + " size " + block.getNumBytes()); } - - assert isValidBlock(storedBlock) : "Trying to add an invalid block"; + // + // If this block does not belong to anyfile, then we are done. + // + if (fileINode == null) { + NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: " + + "addStoredBlock request received for " + + block + " on " + node.getName() + + " size " + block.getNumBytes() + + " But it does not belong to any file."); + return block; + } // filter out containingNodes that are marked for decommission. NumberReplicas num = countNodes(storedBlock); @@ -2779,8 +2818,6 @@ // if file is being actively written to, then do not check // replication-factor here. It will be checked when the file is closed. // - INodeFile fileINode = null; - fileINode = storedBlock.getINode(); if (fileINode.isUnderConstruction()) { return block; } Modified: hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/NameNode.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/NameNode.java?rev=674675&r1=674674&r2=674675&view=diff ============================================================================== --- hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/NameNode.java (original) +++ hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/NameNode.java Mon Jul 7 16:42:30 2008 @@ -610,7 +610,9 @@ stateChangeLog.debug("*BLOCK* NameNode.blockReport: " +"from "+nodeReg.getName()+" "+blist.getNumberOfBlocks() +" blocks"); - namesystem.processReport(nodeReg, blist); + Block blocksToDelete[] = namesystem.processReport(nodeReg, blist); + if (blocksToDelete != null && blocksToDelete.length > 0) + return new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blocksToDelete); if (getFSImage().isUpgradeFinalized()) return DatanodeCommand.FINALIZE; return null;