Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 30476 invoked from network); 13 Feb 2007 00:51:52 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 13 Feb 2007 00:51:52 -0000 Received: (qmail 13340 invoked by uid 500); 13 Feb 2007 00:51:59 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 13327 invoked by uid 500); 13 Feb 2007 00:51:59 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 13318 invoked by uid 99); 13 Feb 2007 00:51:59 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Feb 2007 16:51:59 -0800 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Feb 2007 16:51:49 -0800 Received: by eris.apache.org (Postfix, from userid 65534) id 5A7701A981A; Mon, 12 Feb 2007 16:51:29 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r506778 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java src/java/org/apache/hadoop/dfs/FSNamesystem.java src/java/org/apache/hadoop/dfs/NameNode.java Date: Tue, 13 Feb 2007 00:51:29 -0000 To: hadoop-commits@lucene.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070213005129.5A7701A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cutting Date: Mon Feb 12 16:51:28 2007 New Revision: 506778 URL: http://svn.apache.org/viewvc?view=rev&rev=506778 Log: HADOOP-923. Move replication computation to a separate thread, to improve heartbeat processing time. Contributed by Dhruba. Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=506778&r1=506777&r2=506778 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Mon Feb 12 16:51:28 2007 @@ -42,6 +42,10 @@ 12. HADOOP-1010. Add Reporter.NULL, a Reporter implementation that does nothing. (Runping Qi via cutting) +13. HADOOP-923. In HDFS NameNode, move replication computation to a + separate thread, to improve heartbeat processing time. + (Dhruba Borthakur via cutting) + Release 0.11.1 - 2007-02-09 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java?view=diff&rev=506778&r1=506777&r2=506778 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java Mon Feb 12 16:51:28 2007 @@ -42,9 +42,19 @@ // This is an optimization, because contains takes O(n) time on Arraylist protected boolean isAlive = false; + // + // List of blocks to be replicated by this datanode + // Also, a list of datanodes per block to indicate the target + // datanode of this replication. + // + List replicateBlocks; + List replicateTargetSets; + List invalidateBlocks; + /** Default constructor */ public DatanodeDescriptor() { super(); + initWorkLists(); } /** DatanodeDescriptor constructor @@ -76,6 +86,7 @@ int xceiverCount ) { super( nodeID ); updateHeartbeat(capacity, remaining, xceiverCount); + initWorkLists(); } /** DatanodeDescriptor constructor @@ -93,6 +104,16 @@ int xceiverCount ) { super( nodeID, networkLocation ); updateHeartbeat( capacity, remaining, xceiverCount); + initWorkLists(); + } + + /* + * initialize list of blocks that store work for the datanodes + */ + private void initWorkLists() { + replicateBlocks = new ArrayList(); + replicateTargetSets = new ArrayList(); + invalidateBlocks = new ArrayList(); } /** @@ -136,5 +157,109 @@ Iterator getBlockIterator() { return blocks.iterator(); + } + + /* + * Store block replication work. + */ + void addBlocksToBeReplicated(Block[] blocklist, + DatanodeDescriptor[][] targets) { + assert(blocklist != null && targets != null); + assert(blocklist.length > 0 && targets.length > 0); + synchronized (replicateBlocks) { + assert(blocklist.length == targets.length); + for (int i = 0; i < blocklist.length; i++) { + replicateBlocks.add(blocklist[i]); + replicateTargetSets.add(targets[i]); + } + } + } + + /* + * Store block invalidation work. + */ + void addBlocksToBeInvalidated(Block[] blocklist) { + assert(blocklist != null && blocklist.length > 0); + synchronized (invalidateBlocks) { + for (int i = 0; i < blocklist.length; i++) { + invalidateBlocks.add(blocklist[i]); + } + } + } + + /* + * The number of work items that are pending to be replicated + */ + int getNumberOfBlocksToBeReplicated() { + synchronized (replicateBlocks) { + return replicateBlocks.size(); + } + } + + /* + * The number of block invalidattion items that are pending to + * be sent to the datanode + */ + int getNumberOfBlocksToBeInvalidated() { + synchronized (invalidateBlocks) { + return invalidateBlocks.size(); + } + } + + /** + * Remove the specified number of target sets + */ + void getReplicationSets(int maxNumTransfers, Object[] xferResults) { + assert(xferResults.length == 2); + assert(xferResults[0] == null && xferResults[1] == null); + + synchronized (replicateBlocks) { + assert(replicateBlocks.size() == replicateTargetSets.size()); + + if (maxNumTransfers <= 0 || replicateBlocks.size() == 0) { + return; + } + int numTransfers = 0; + int numBlocks = 0; + int i; + for (i = 0; i < replicateTargetSets.size() && + numTransfers < maxNumTransfers; i++) { + numTransfers += replicateTargetSets.get(i).length; + } + numBlocks = i; + Block[] blocklist = new Block[numBlocks]; + DatanodeDescriptor targets[][] = new DatanodeDescriptor[numBlocks][]; + + for (i = 0; i < numBlocks; i++) { + blocklist[i] = replicateBlocks.get(0); + targets[i] = replicateTargetSets.get(0); + replicateBlocks.remove(0); + replicateTargetSets.remove(0); + } + xferResults[0] = blocklist; + xferResults[1] = targets; + assert(blocklist.length > 0 && targets.length > 0); + } + } + + /** + * Remove the specified number of blocks to be invalidated + */ + void getInvalidateBlocks(int maxblocks, Object[] xferResults) { + assert(xferResults[0] == null); + + synchronized (invalidateBlocks) { + if (maxblocks <= 0 || invalidateBlocks.size() == 0) { + return; + } + int outnum = Math.min(maxblocks, invalidateBlocks.size()); + Block[] blocklist = new Block[outnum]; + for (int i = 0; i < outnum; i++) { + blocklist[i] = invalidateBlocks.get(0); + invalidateBlocks.remove(0); + } + assert(blocklist.length > 0); + xferResults[0] = blocklist; + } } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=506778&r1=506777&r2=506778 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Mon Feb 12 16:51:28 2007 @@ -172,6 +172,7 @@ Daemon hbthread = null; // HeartbeatMonitor thread Daemon lmthread = null; // LeaseMonitor thread Daemon smmthread = null; // SafeModeMonitor thread + Daemon replthread = null; // Replication thread boolean fsRunning = true; long systemStart = 0; @@ -186,6 +187,10 @@ // heartbeatExpireInterval is how long namenode waits for datanode to report // heartbeat private long heartbeatExpireInterval; + //replicationRecheckInterval is how often namenode checks for new replication work + private long replicationRecheckInterval; + static int replIndex = 0; // last datanode used for replication work + static int REPL_WORK_PER_ITERATION = 32; // max percent datanodes per iteration public static FSNamesystem fsNamesystemObject; private String localMachine; @@ -228,6 +233,7 @@ this.heartbeatRecheckInterval = 5 * 60 * 1000; // 5 minutes this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval + 10 * heartbeatInterval; + this.replicationRecheckInterval = 3 * 1000; // 3 second this.localMachine = hostname; this.port = port; @@ -237,8 +243,10 @@ setBlockTotal(); this.hbthread = new Daemon(new HeartbeatMonitor()); this.lmthread = new Daemon(new LeaseMonitor()); + this.replthread = new Daemon(new ReplicationMonitor()); hbthread.start(); lmthread.start(); + replthread.start(); this.systemStart = now(); this.startTime = new Date(systemStart); @@ -280,6 +288,7 @@ try { infoServer.stop(); hbthread.join(3000); + replthread.join(3000); } catch (InterruptedException ie) { } finally { // using finally to ensure we also wait for lease daemon @@ -435,6 +444,14 @@ int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta; int curPri = getPriority(block, curReplicas, curExpectedReplicas); int oldPri = getPriority(block, oldReplicas, oldExpectedReplicas); + NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " + + block + + " curReplicas " + curReplicas + + " curExpectedReplicas " + curExpectedReplicas + + " oldReplicas " + oldReplicas + + " oldExpectedReplicas " + oldExpectedReplicas + + " curPri " + curPri + + " oldPri " + oldPri); if( oldPri != LEVEL && oldPri != curPri ) { remove(block, oldPri); } @@ -1575,7 +1592,10 @@ public boolean gotHeartbeat( DatanodeID nodeID, long capacity, long remaining, - int xceiverCount + int xceiverCount, + int xmitsInProgress, + Object[] xferResults, + Object deleteList[] ) throws IOException { synchronized (heartbeats) { synchronized (datanodeMap) { @@ -1595,6 +1615,16 @@ updateStats(nodeinfo, false); nodeinfo.updateHeartbeat(capacity, remaining, xceiverCount); updateStats(nodeinfo, true); + // + // Extract pending replication work or block invalidation + // work from the datanode descriptor + // + nodeinfo.getReplicationSets(this.maxReplicationStreams - + xmitsInProgress, xferResults); + if (xferResults[0] == null) { + nodeinfo.getInvalidateBlocks(FSConstants.BLOCK_INVALIDATE_CHUNK, + deleteList); + } return false; } } @@ -1634,6 +1664,130 @@ } /** + * Periodically calls computeReplicationWork(). + */ + class ReplicationMonitor implements Runnable { + public void run() { + while (fsRunning) { + try { + computeDatanodeWork(); + Thread.sleep(replicationRecheckInterval); + } catch (InterruptedException ie) { + } catch (IOException ie) { + LOG.warn("ReplicationMonitor thread received exception. " + ie); + } + } + } + } + + /** + * Look at a few datanodes and compute any replication work that + * can be scheduled on them. The datanode will be infomed of this + * work at the next heartbeat. + */ + void computeDatanodeWork() throws IOException { + int numiter = 0; + int foundwork = 0; + int hsize = 0; + + while (true) { + DatanodeDescriptor node = null; + + // + // pick the datanode that was the last one in the + // previous invocation of this method. + // + synchronized (heartbeats) { + hsize = heartbeats.size(); + if (numiter++ >= hsize) { + break; + } + if (replIndex >= hsize) { + replIndex = 0; + } + node = heartbeats.get(replIndex); + replIndex++; + } + + // + // Is there replication work to be computed for this datanode? + // + int precomputed = node.getNumberOfBlocksToBeReplicated(); + int needed = this.maxReplicationStreams - precomputed; + boolean doReplication = false; + boolean doInvalidation = false; + if (needed > 0) { + // + // Compute replication work and store work into the datanode + // + Object replsets[] = pendingTransfers(node, needed); + if (replsets != null) { + doReplication = true; + addBlocksToBeReplicated(node, (Block[])replsets[0], + (DatanodeDescriptor[][])replsets[1]); + } + } + if (!doReplication) { + // + // Determine if block deletion is pending for this datanode + // + Block blocklist[] = blocksToInvalidate(node); + if (blocklist != null) { + doInvalidation = true; + addBlocksToBeInvalidated(node, blocklist); + } + } + if (doReplication || doInvalidation) { + // + // If we have already computed work for a predefined + // number of datanodes in this iteration, then relax + // + if (foundwork > ((hsize * REPL_WORK_PER_ITERATION)/100)) { + break; + } + foundwork++; + } else { + // + // See if the decommissioned node has finished moving all + // its datablocks to another replica. This is a loose + // heuristic to determine when a decommission is really over. + // + checkDecommissionState(node); + } + } + } + + /** + * Add more replication work for this datanode. + */ + synchronized void addBlocksToBeReplicated(DatanodeDescriptor node, + Block[] blocklist, + DatanodeDescriptor[][] targets) + throws IOException { + // + // Find the datanode with the FSNamesystem lock held. + // + DatanodeDescriptor n = getDatanode(node); + if (n != null) { + n.addBlocksToBeReplicated(blocklist, targets); + } + } + + /** + * Add more block invalidation work for this datanode. + */ + synchronized void addBlocksToBeInvalidated(DatanodeDescriptor node, + Block[] blocklist) throws IOException { + // + // Find the datanode with the FSNamesystem lock held. + // + DatanodeDescriptor n = getDatanode(node); + if (n != null) { + n.addBlocksToBeInvalidated(blocklist); + } + } + + /** * remove a datanode descriptor * @param nodeID datanode ID * @author hairong @@ -2405,7 +2559,7 @@ * */ public synchronized Object[] pendingTransfers(DatanodeID srcNode, - int xmitsInProgress) { + int needed) { // Ask datanodes to perform block replication // only if safe mode is off. if( isInSafeMode() ) @@ -2413,7 +2567,6 @@ synchronized (neededReplications) { Object results[] = null; - int scheduledXfers = 0; if (neededReplications.size() > 0) { // @@ -2426,13 +2579,9 @@ List replicateTargetSets; replicateTargetSets = new ArrayList(); for (Iterator it = neededReplications.iterator(); it.hasNext();) { - // - // We can only reply with 'maxXfers' or fewer blocks - // - if (scheduledXfers >= this.maxReplicationStreams - xmitsInProgress) { + if (needed <= 0) { break; } - Block block = it.next(); long blockSize = block.getNumBytes(); FSDirectory.INode fileINode = dir.getFileByBlock(block); @@ -2453,7 +2602,7 @@ int numCurrentReplica = nodes.size(); DatanodeDescriptor targets[] = replicator.chooseTarget( Math.min( fileINode.getReplication() - numCurrentReplica, - this.maxReplicationStreams - xmitsInProgress), + needed), datanodeMap.get(srcNode.getStorageID()), nodes, null, blockSize); if (targets.length > 0) { @@ -2461,7 +2610,7 @@ replicateBlocks.add(block); numCurrentReplicas.add(new Integer(numCurrentReplica)); replicateTargetSets.add(targets); - scheduledXfers += targets.length; + needed -= targets.length; } } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?view=diff&rev=506778&r1=506777&r2=506778 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Mon Feb 12 16:51:28 2007 @@ -579,18 +579,27 @@ long remaining, int xmitsInProgress, int xceiverCount) throws IOException { + Object xferResults[] = new Object[2]; + xferResults[0] = xferResults[1] = null; + Object deleteList[] = new Object[1]; + deleteList[0] = null; + verifyRequest( nodeReg ); - if( namesystem.gotHeartbeat( nodeReg, capacity, remaining, xceiverCount )) { + if( namesystem.gotHeartbeat( nodeReg, capacity, remaining, + xceiverCount, + xmitsInProgress, + xferResults, + deleteList)) { // request block report from the datanode + assert(xferResults[0] == null && deleteList[0] == null); return new BlockCommand( DataNodeAction.DNA_REGISTER ); } // // Ask to perform pending transfers, if any // - Object xferResults[] = namesystem.pendingTransfers( nodeReg, - xmitsInProgress ); - if (xferResults != null) { + if (xferResults[0] != null) { + assert(deleteList[0] == null); return new BlockCommand((Block[]) xferResults[0], (DatanodeInfo[][]) xferResults[1]); } @@ -600,18 +609,9 @@ // a block report. This is just a small fast removal of blocks that have // just been removed. // - Block blocks[] = namesystem.blocksToInvalidate( nodeReg ); - if (blocks != null) { - return new BlockCommand(blocks); + if (deleteList[0] != null) { + return new BlockCommand((Block[]) deleteList[0]); } - // - // See if the decommissioned node has finished moving all - // its datablocks to another replica. This is a loose - // heuristic to determine when a decommission is really over. - // We can probbaly do it in a seperate thread rather than making - // the heartbeat thread do this. - // - namesystem.checkDecommissionState(nodeReg); return null; }