Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 34805 invoked from network); 4 Oct 2007 23:59:31 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 4 Oct 2007 23:59:31 -0000 Received: (qmail 58108 invoked by uid 500); 4 Oct 2007 23:59:20 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 57992 invoked by uid 500); 4 Oct 2007 23:59:20 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 57983 invoked by uid 99); 4 Oct 2007 23:59:20 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 Oct 2007 16:59:20 -0700 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 Oct 2007 23:59:23 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 7844A1A9832; Thu, 4 Oct 2007 16:59:03 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r582029 - in /lucene/hadoop/branches/branch-0.14: ./ src/java/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/dfs/ Date: Thu, 04 Oct 2007 23:59:02 -0000 To: hadoop-commits@lucene.apache.org From: dhruba@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20071004235903.7844A1A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: dhruba Date: Thu Oct 4 16:59:00 2007 New Revision: 582029 URL: http://svn.apache.org/viewvc?rev=582029&view=rev Log: HADOOP-1955. The Namenode tries to not pick the same source Datanode for a replication request if the earlier replication request for the same block and that source Datanode had failed. (Raghu Angadi via dhruba) This corresponds to changelist 582028 on trunk. Modified: lucene/hadoop/branches/branch-0.14/CHANGES.txt lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSDataset.java lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSNamesystem.java lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/TestReplication.java Modified: lucene/hadoop/branches/branch-0.14/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/CHANGES.txt?rev=582029&r1=582028&r2=582029&view=diff ============================================================================== --- lucene/hadoop/branches/branch-0.14/CHANGES.txt (original) +++ lucene/hadoop/branches/branch-0.14/CHANGES.txt Thu Oct 4 16:59:00 2007 @@ -26,6 +26,11 @@ HADOOP-1978. Name-node removes edits.new after a successful startup. (Konstantin Shvachko via dhruba) + HADOOP-1955. The Namenode tries to not pick the same source Datanode for + a replication request if the earlier replication request for the same + block and that source Datanode had failed. + (Raghu Angadi via dhruba) + Release 0.14.1 - 2007-09-04 BUG FIXES Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSDataset.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSDataset.java?rev=582029&r1=582028&r2=582029&view=diff ============================================================================== --- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSDataset.java (original) +++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSDataset.java Thu Oct 4 16:59:00 2007 @@ -442,7 +442,9 @@ private HashMap volumeMap = null; private HashMap blockMap = null; static Random random = new Random(); - + + long blockWriteTimeout = 3600 * 1000; + /** * An FSDataset has a directory where it loads its data files. */ @@ -457,6 +459,8 @@ volumes.getVolumeMap(volumeMap); blockMap = new HashMap(); volumes.getBlockMap(blockMap); + blockWriteTimeout = Math.max( + conf.getInt("dfs.datanode.block.write.timeout.sec", 3600), 1) * 1000; } /** @@ -526,8 +530,9 @@ // if (ongoingCreates.containsKey(b)) { // check how old is the temp file - wait 1 hour - File tmp = (File)ongoingCreates.get(b); - if ((System.currentTimeMillis() - tmp.lastModified()) < 3600 * 1000) { + File tmp = ongoingCreates.get(b); + if ((System.currentTimeMillis() - tmp.lastModified()) < + blockWriteTimeout) { throw new IOException("Block " + b + " has already been started (though not completed), and thus cannot be created."); } else { Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSNamesystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=582029&r1=582028&r2=582029&view=diff ============================================================================== --- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original) +++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSNamesystem.java Thu Oct 4 16:59:00 2007 @@ -183,7 +183,7 @@ private long replicationRecheckInterval; //decommissionRecheckInterval is how often namenode checks if a node has finished decommission private long decommissionRecheckInterval; - static int replIndex = 0; // last datanode used for replication work + private int replIndex = 0; // last datanode used for replication work static int REPL_WORK_PER_ITERATION = 32; // max percent datanodes per iteration public static FSNamesystem fsNamesystemObject; @@ -217,7 +217,9 @@ this.dir.loadFSImage(getNamespaceDirs(conf), startOpt); this.safeMode = new SafeModeInfo(conf); setBlockTotal(); - pendingReplications = new PendingReplicationBlocks(LOG); + pendingReplications = new PendingReplicationBlocks(LOG, + conf.getInt("dfs.replication.pending.timeout.sec", + -1) * 1000); this.hbthread = new Daemon(new HeartbeatMonitor()); this.lmthread = new Daemon(new LeaseMonitor()); this.replthread = new Daemon(new ReplicationMonitor()); @@ -1886,6 +1888,7 @@ int numiter = 0; int foundwork = 0; int hsize = 0; + int lastReplIndex = -1; while (true) { DatanodeDescriptor node = null; @@ -1897,6 +1900,11 @@ synchronized (heartbeats) { hsize = heartbeats.size(); if (numiter++ >= hsize) { + // no change in replIndex. + if (lastReplIndex >= 0) { + //next time, start after where the last replication was scheduled + replIndex = lastReplIndex; + } break; } if (replIndex >= hsize) { @@ -1922,6 +1930,7 @@ doReplication = true; addBlocksToBeReplicated(node, (Block[])replsets[0], (DatanodeDescriptor[][])replsets[1]); + lastReplIndex = replIndex; } } if (!doReplication) { Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java?rev=582029&r1=582028&r2=582029&view=diff ============================================================================== --- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java (original) +++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java Thu Oct 4 16:59:00 2007 @@ -49,12 +49,14 @@ private long defaultRecheckInterval = 5 * 60 * 1000; PendingReplicationBlocks(long timeoutPeriod) { - this.timeout = timeoutPeriod; - init(); + this(null, timeoutPeriod); } - PendingReplicationBlocks(Log log) { + PendingReplicationBlocks(Log log, long timeoutPeriod) { this.LOG = log; + if ( timeoutPeriod > 0 ) { + this.timeout = timeoutPeriod; + } init(); } Modified: lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java?rev=582029&r1=582028&r2=582029&view=diff ============================================================================== --- lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java (original) +++ lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java Thu Oct 4 16:59:00 2007 @@ -113,7 +113,9 @@ conf.set("dfs.name.dir", new File(base_dir, "name1").getPath()+","+ new File(base_dir, "name2").getPath()); } - conf.setInt("dfs.replication", Math.min(3, numDataNodes)); + + int replication = conf.getInt("dfs.replication", 3); + conf.setInt("dfs.replication", Math.min(replication, numDataNodes)); conf.setInt("dfs.safemode.extension", 0); conf.setInt("dfs.namenode.decommission.interval", 3 * 1000); // 3 second Modified: lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/TestReplication.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/TestReplication.java?rev=582029&r1=582028&r2=582029&view=diff ============================================================================== --- lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/TestReplication.java (original) +++ lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/TestReplication.java Thu Oct 4 16:59:00 2007 @@ -19,6 +19,7 @@ import junit.framework.TestCase; import java.io.*; +import java.util.Iterator; import java.util.Random; import java.net.*; @@ -166,4 +167,152 @@ cluster.shutdown(); } } + + // Waits for all of the blocks to have expected replication + private void waitForBlockReplication(String filename, + ClientProtocol namenode, + int expected, long maxWaitSec) + throws IOException { + long start = System.currentTimeMillis(); + + //wait for all the blocks to be replicated; + System.out.println("Checking for block replication for " + filename); + int iters = 0; + while (true) { + boolean replOk = true; + LocatedBlocks blocks = namenode.getBlockLocations(filename, 0, + Long.MAX_VALUE); + + for (Iterator iter = blocks.getLocatedBlocks().iterator(); + iter.hasNext();) { + LocatedBlock block = iter.next(); + int actual = block.getLocations().length; + if ( actual < expected ) { + if (true || iters > 0) { + System.out.println("Not enough replicas for " + block.getBlock() + + " yet. Expecting " + expected + ", got " + + actual + "."); + } + replOk = false; + break; + } + } + + if (replOk) { + return; + } + + iters++; + + if (maxWaitSec > 0 && + (System.currentTimeMillis() - start) > (maxWaitSec * 1000)) { + throw new IOException("Timedout while waiting for all blocks to " + + " be replicated for " + filename); + } + + try { + Thread.sleep(500); + } catch (InterruptedException ignored) {} + } + } + + /* This test makes sure that NameNode retries all the available blocks + * for under replicated blocks. + * + * It creates a file with one block and replication of 4. It corrupts + * two of the blocks and removes one of the replicas. Expected behaviour is + * that missing replica will be copied from one valid source. + */ + public void testPendingReplicationRetry() throws IOException { + + MiniDFSCluster cluster = null; + int numDataNodes = 4; + String testFile = "/replication-test-file"; + Path testPath = new Path(testFile); + + byte buffer[] = new byte[1024]; + for (int i=0; i 50); + RandomAccessFile blockOut = new RandomAccessFile(blockFile, "rw"); + blockOut.seek(len/3); + blockOut.write(buffer, 0, 25); + } + fileCount++; + } + } + assertEquals(3, fileCount); + + /* Start the MiniDFSCluster with more datanodes since once a writeBlock + * to a datanode node fails, same block can not be written to it + * immediately. In our case some replication attempts will fail. + */ + conf = new Configuration(); + conf.set("dfs.replication", Integer.toString(numDataNodes)); + conf.set("dfs.replication.pending.timeout.sec", Integer.toString(2)); + conf.set("dfs.datanode.block.write.timeout.sec", Integer.toString(5)); + conf.set("dfs.safemode.threshold.pct", "0.75f"); // only 3 copies exist + + cluster = new MiniDFSCluster(0, conf, numDataNodes*2, false, + true, null, null); + cluster.waitActive(); + + dfsClient = new DFSClient(new InetSocketAddress("localhost", + cluster.getNameNodePort()), + conf); + + waitForBlockReplication(testFile, dfsClient.namenode, numDataNodes, -1); + + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } }