Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 20181 invoked from network); 28 Feb 2007 20:17:27 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 28 Feb 2007 20:17:27 -0000 Received: (qmail 90200 invoked by uid 500); 28 Feb 2007 20:17:35 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 90182 invoked by uid 500); 28 Feb 2007 20:17:35 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 90173 invoked by uid 99); 28 Feb 2007 20:17:35 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 Feb 2007 12:17:35 -0800 X-ASF-Spam-Status: No, hits=-99.5 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 Feb 2007 12:17:26 -0800 Received: by eris.apache.org (Postfix, from userid 65534) id 1427E1A981A; Wed, 28 Feb 2007 12:17:06 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r512944 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/dfs/FSNamesystem.java src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java src/test/org/apache/hadoop/dfs/TestPendingReplication.java Date: Wed, 28 Feb 2007 20:17:05 -0000 To: hadoop-commits@lucene.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070228201706.1427E1A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cutting Date: Wed Feb 28 12:17:05 2007 New Revision: 512944 URL: http://svn.apache.org/viewvc?view=rev&rev=512944 Log: HADOOP-940. Improve HDFS's replication scheduling. Contributed by Dhruba. Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestPendingReplication.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=512944&r1=512943&r2=512944 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Wed Feb 28 12:17:05 2007 @@ -162,6 +162,9 @@ 48. HADOOP-1043. Optimize shuffle, increasing parallelism. (Devaraj Das via cutting) +49. HADOOP-940. Improve HDFS's replication scheduling. + (Dhruba Borthakur via cutting) + Release 0.11.2 - 2007-02-16 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=512944&r1=512943&r2=512944 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Wed Feb 28 12:17:05 2007 @@ -156,7 +156,7 @@ // Set of: Block // private UnderReplicationBlocks neededReplications = new UnderReplicationBlocks(); - private Collection pendingReplications = new TreeSet(); + private PendingReplicationBlocks pendingReplications; // // Used for handling lock-leases @@ -248,6 +248,7 @@ this.dir.loadFSImage( conf ); this.safeMode = new SafeModeInfo( conf ); setBlockTotal(); + pendingReplications = new PendingReplicationBlocks(LOG); this.hbthread = new Daemon(new HeartbeatMonitor()); this.lmthread = new Daemon(new LeaseMonitor()); this.replthread = new Daemon(new ReplicationMonitor()); @@ -298,6 +299,7 @@ fsRunning = false; } try { + pendingReplications.stop(); infoServer.stop(); hbthread.join(3000); replthread.join(3000); @@ -1706,6 +1708,7 @@ while (fsRunning) { try { computeDatanodeWork(); + processPendingReplications(); Thread.sleep(replicationRecheckInterval); } catch (InterruptedException ie) { } catch (IOException ie) { @@ -1793,6 +1796,21 @@ } /** + * If there were any replication requests that timed out, reap them + * and put them back into the neededReplication queue + */ + void processPendingReplications() { + Block[] timedOutItems = pendingReplications.getTimedOutBlocks(); + if (timedOutItems != null) { + synchronized (this) { + for (int i = 0; i < timedOutItems.length; i++) { + neededReplications.add(timedOutItems[i]); + } + } + } + } + + /** * Add more replication work for this datanode. */ synchronized void addBlocksToBeReplicated(DatanodeDescriptor node, @@ -2094,9 +2112,6 @@ if(neededReplications.contains(block)) { neededReplications.update(block, curReplicaDelta, 0); } - if (numCurrentReplica >= fileReplication ) { - pendingReplications.remove(block); - } proccessOverReplicatedBlock( block, fileReplication ); return block; } @@ -2256,6 +2271,7 @@ // Modify the blocks->datanode map and node's map. // node.addBlock( addStoredBlock(block, node) ); + pendingReplications.remove(block); } /** @@ -2606,7 +2622,8 @@ // filter out containingNodes that are marked for decommission. List nodes = filterDecommissionedNodes(containingNodes); - int numCurrentReplica = nodes.size(); + int numCurrentReplica = nodes.size() + + pendingReplications.getNumReplicas(block); DatanodeDescriptor targets[] = replicator.chooseTarget( Math.min( fileINode.getReplication() - numCurrentReplica, needed), @@ -2640,7 +2657,7 @@ if (numCurrentReplica + targets.length >= numExpectedReplica) { neededReplications.remove( block, numCurrentReplica, numExpectedReplica); - pendingReplications.add(block); + pendingReplications.add(block, targets.length); NameNode.stateChangeLog.debug( "BLOCK* NameSystem.pendingTransfer: " + block.getBlockName() Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java?view=auto&rev=512944 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java Wed Feb 28 12:17:05 2007 @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.dfs; + +import org.apache.commons.logging.*; +import org.apache.hadoop.util.*; +import java.util.*; + +/*************************************************** + * PendingReplicationBlocks does the bookkeeping of all + * blocks that are getting replicated. + * + * It does the following: + * 1) record blocks that are getting replicated at this instant. + * 2) a coarse grain timer to track age of replication request + * 3) a thread that periodically identifies replication-requests + * that never made it. + * + * @author Dhruba Borthakur + ***************************************************/ +class PendingReplicationBlocks { + private Log LOG = null; + private Map pendingReplications; + private ArrayList timedOutItems; + Daemon timerThread = null; + private boolean fsRunning = true; + + // + // It might take anywhere between 5 to 10 minutes before + // a request is timed out. + // + private long timeout = 5 * 60 * 1000; + private long defaultRecheckInterval = 5 * 60 * 1000; + + PendingReplicationBlocks(long timeoutPeriod) { + this.timeout = timeoutPeriod; + init(); + } + + PendingReplicationBlocks(Log log) { + this.LOG = log; + init(); + } + + void init() { + pendingReplications = new HashMap(); + timedOutItems = new ArrayList(); + this.timerThread = new Daemon(new PendingReplicationMonitor()); + timerThread.start(); + } + + /** + * Add a block to the list of pending Replications + */ + void add(Block block, int numReplicas) { + synchronized (pendingReplications) { + PendingBlockInfo found = pendingReplications.get(block); + if (found == null) { + pendingReplications.put(block, new PendingBlockInfo(numReplicas)); + } else { + found.incrementReplicas(numReplicas); + found.setTimeStamp(); + } + } + } + + /** + * One replication request for this block has finished. + * Decrement the number of pending replication requests + * for this block. + */ + void remove(Block block) { + synchronized (pendingReplications) { + PendingBlockInfo found = pendingReplications.get(block); + if (found != null) { + found.decrementReplicas(); + if (found.getNumReplicas() <= 0) { + pendingReplications.remove(block); + } + } + } + } + + /** + * The total number of blocks that are undergoing replication + */ + long size() { + return pendingReplications.size(); + } + + /** + * How many copies of this block is pending replication? + */ + int getNumReplicas(Block block) { + synchronized (pendingReplications) { + PendingBlockInfo found = pendingReplications.get(block); + if (found != null) { + return found.getNumReplicas(); + } + } + return 0; + } + + /** + * Returns a list of blocks that have timed out their + * replication requests. Returns null if no blocks have + * timed out. + */ + Block[] getTimedOutBlocks() { + synchronized (timedOutItems) { + if (timedOutItems.size() <= 0) { + return null; + } + Block[] blockList = timedOutItems.toArray( + new Block[timedOutItems.size()]); + timedOutItems.clear(); + return blockList; + } + } + + /** + * An object that contains information about a block that + * is being replicated. It records the timestamp when the + * system started replicating the most recent copy of this + * block. It also records the number of replication + * requests that are in progress. + */ + class PendingBlockInfo { + private long timeStamp; + private int numReplicasInProgress; + + PendingBlockInfo(int numReplicas) { + this.timeStamp = FSNamesystem.now(); + this.numReplicasInProgress = numReplicas; + } + + long getTimeStamp() { + return timeStamp; + } + + void setTimeStamp() { + timeStamp = FSNamesystem.now(); + } + + void incrementReplicas(int increment) { + numReplicasInProgress += increment; + } + + void decrementReplicas() { + numReplicasInProgress--; + assert(numReplicasInProgress >= 0); + } + + int getNumReplicas() { + return numReplicasInProgress; + } + } + + /* + * A periodic thread that scans for blocks that never finished + * their replication request. + */ + class PendingReplicationMonitor implements Runnable { + public void run() { + while (fsRunning) { + long period = Math.min(defaultRecheckInterval, timeout); + try { + pendingReplicationCheck(); + Thread.sleep(period); + } catch (InterruptedException ie) { + if (LOG != null) { + LOG.warn("PendingReplicationMonitor thread received exception. " + + ie); + } + } + } + } + + /** + * Iterate through all items and detect timed-out items + */ + void pendingReplicationCheck() { + synchronized (pendingReplications) { + Iterator iter = pendingReplications.entrySet().iterator(); + long now = FSNamesystem.now(); + while (iter.hasNext()) { + Map.Entry entry = (Map.Entry) iter.next(); + PendingBlockInfo pendingBlock = (PendingBlockInfo) entry.getValue(); + if (now > pendingBlock.getTimeStamp() + timeout) { + Block block = (Block) entry.getKey(); + synchronized (timedOutItems) { + timedOutItems.add(block); + } + if (LOG != null) { + LOG.warn("PendingReplicationMonitor timed out block " + block); + } + iter.remove(); + } + } + } + } + } + + /* + * Shuts down the pending replication monitor thread. + * Waits for the thread to exit. + */ + void stop() { + fsRunning = false; + timerThread.interrupt(); + try { + timerThread.join(3000); + } catch (InterruptedException ie) { + } + } +} Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestPendingReplication.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestPendingReplication.java?view=auto&rev=512944 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestPendingReplication.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestPendingReplication.java Wed Feb 28 12:17:05 2007 @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.dfs; + +import junit.framework.TestCase; +import java.lang.System; + +/** + * This class tests the internals of PendingReplicationBlocks.java + * @author Dhruba Borthakur + */ +public class TestPendingReplication extends TestCase { + public void testPendingReplication() { + int timeout = 10; // 10 seconds + PendingReplicationBlocks pendingReplications; + pendingReplications = new PendingReplicationBlocks(timeout * 1000); + + // + // Add 10 blocks to pendingReplciations. + // + for (int i = 0; i < 10; i++) { + Block block = new Block(i, i); + pendingReplications.add(block, i); + } + assertEquals("Size of pendingReplications ", + 10, pendingReplications.size()); + + + // + // remove one item and reinsert it + // + Block blk = new Block(8, 8); + pendingReplications.remove(blk); // removes one replica + assertEquals("pendingReplications.getNumReplicas ", + 7, pendingReplications.getNumReplicas(blk)); + + for (int i = 0; i < 7; i++) { + pendingReplications.remove(blk); // removes all replicas + } + assertTrue(pendingReplications.size() == 9); + pendingReplications.add(blk, 8); + assertTrue(pendingReplications.size() == 10); + + // + // verify that the number of replicas returned + // are sane. + // + for (int i = 0; i < 10; i++) { + Block block = new Block(i, i); + int numReplicas = pendingReplications.getNumReplicas(block); + assertTrue(numReplicas == i); + } + + // + // verify that nothing has timed out so far + // + assertTrue(pendingReplications.getTimedOutBlocks() == null); + + // + // Wait for one second and then insert some more items. + // + try { + Thread.sleep(1000); + } catch (Exception e) { + } + + for (int i = 10; i < 15; i++) { + Block block = new Block(i, i); + pendingReplications.add(block, i); + } + assertTrue(pendingReplications.size() == 15); + + // + // Wait for everything to timeout. + // + int loop = 0; + while (pendingReplications.size() > 0) { + try { + Thread.sleep(1000); + } catch (Exception e) { + } + loop++; + } + System.out.println("Had to wait for " + loop + + " seconds for the lot to timeout"); + + // + // Verify that everything has timed out. + // + assertEquals("Size of pendingReplications ", + 0, pendingReplications.size()); + Block[] timedOut = pendingReplications.getTimedOutBlocks(); + assertTrue(timedOut != null && timedOut.length == 15); + for (int i = 0; i < timedOut.length; i++) { + assertTrue(timedOut[i].getBlockId() < 15); + } + } +}