Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 92968 invoked from network); 30 Aug 2010 04:12:14 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 30 Aug 2010 04:12:14 -0000 Received: (qmail 14918 invoked by uid 500); 30 Aug 2010 04:12:14 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 14831 invoked by uid 500); 30 Aug 2010 04:12:11 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 14823 invoked by uid 99); 30 Aug 2010 04:12:10 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 30 Aug 2010 04:12:10 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 30 Aug 2010 04:12:09 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id CBB7123888FD; Mon, 30 Aug 2010 04:10:51 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r990693 - in /hadoop/mapreduce/trunk: CHANGES.txt src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java Date: Mon, 30 Aug 2010 04:10:51 -0000 To: mapreduce-commits@hadoop.apache.org From: dhruba@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100830041051.CBB7123888FD@eris.apache.org> Author: dhruba Date: Mon Aug 30 04:10:51 2010 New Revision: 990693 URL: http://svn.apache.org/viewvc?rev=990693&view=rev Log: MAPREDUCE-1668. RaidNode Hars a directory only if all its parity files have been created. (Ramkumar Vadali via dhruba) Modified: hadoop/mapreduce/trunk/CHANGES.txt hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java Modified: hadoop/mapreduce/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=990693&r1=990692&r2=990693&view=diff ============================================================================== --- hadoop/mapreduce/trunk/CHANGES.txt (original) +++ hadoop/mapreduce/trunk/CHANGES.txt Mon Aug 30 04:10:51 2010 @@ -262,6 +262,9 @@ Trunk (unreleased changes) MAPREDUCE-1670. RAID policies should not scan their own destination path. (Ramkumar Vadali via dhruba) + MAPREDUCE-1668. RaidNode Hars a directory only if all its parity files + have been created. (Ramkumar Vadali via dhruba) + Release 0.21.0 - Unreleased INCOMPATIBLE CHANGES Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java?rev=990693&r1=990692&r2=990693&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java (original) +++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java Mon Aug 30 04:10:51 2010 @@ -1364,7 +1364,8 @@ public class RaidNode implements RaidPro if (stat != null) { LOG.info("Haring parity files for policy " + info.getName() + " " + destPath); - recurseHar(destFs, stat, cutoff, tmpHarPath); + recurseHar(info, destFs, stat, destPref.toUri().getPath(), + srcPath.getFileSystem(conf), cutoff, tmpHarPath); } } } @@ -1380,7 +1381,8 @@ public class RaidNode implements RaidPro return; } - private void recurseHar(FileSystem destFs, FileStatus dest, long cutoff, String tmpHarPath) + private void recurseHar(PolicyInfo info, FileSystem destFs, FileStatus dest, + String destPrefix, FileSystem srcFs, long cutoff, String tmpHarPath) throws IOException { if (dest.isFile()) { @@ -1388,6 +1390,7 @@ public class RaidNode implements RaidPro } Path destPath = dest.getPath(); // pathname, no host:port + String destStr = destPath.toUri().getPath(); // Verify if it already contains a HAR directory if ( destFs.exists(new Path(destPath, destPath.getName()+HAR_SUFFIX)) ) { @@ -1401,13 +1404,34 @@ public class RaidNode implements RaidPro shouldHar = files.length > 0; for (FileStatus one: files) { if (one.isDirectory()){ - recurseHar(destFs, one, cutoff, tmpHarPath); + recurseHar(info, destFs, one, destPrefix, srcFs, cutoff, tmpHarPath); shouldHar = false; } else if (one.getModificationTime() > cutoff ) { shouldHar = false; } } + + if (shouldHar) { + String src = destStr.replaceFirst(destPrefix, ""); + Path srcPath = new Path(src); + FileStatus[] statuses = srcFs.listStatus(srcPath); + Path destPathPrefix = new Path(destPrefix).makeQualified(destFs); + if (statuses != null) { + for (FileStatus status : statuses) { + if (getParityFile(destPathPrefix, + status.getPath().makeQualified(srcFs)) == null ) { + LOG.info("Cannot archive " + destPath + + " because it doesn't contain parity file for " + + status.getPath().makeQualified(srcFs) + " on destination " + + destPathPrefix); + shouldHar = false; + break; + } + } + } + } } + if ( shouldHar ) { singleHar(destFs, dest, tmpHarPath); } Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java?rev=990693&r1=990692&r2=990693&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java (original) +++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java Mon Aug 30 04:10:51 2010 @@ -199,13 +199,15 @@ public class TestRaidHar extends TestCas mySetup(srcPath, targetReplication, metaReplication, stripeLength); RaidShell shell = null; Path dir = new Path("/user/test/raidtest/subdir/"); - Path file1 = new Path(dir + "/file" + iter); RaidNode cnode = null; try { Path destPath = new Path("/destraid/user/test/raidtest/subdir"); fileSys.delete(dir, true); fileSys.delete(destPath, true); - TestRaidNode.createOldFile(fileSys, file1, 1, numBlock, blockSize); + for (int i = 0; i < 10; i++) { + Path file = new Path(dir + "/file" + i); + TestRaidNode.createOldFile(fileSys, file, 1, numBlock, blockSize); + } LOG.info("doTestHar created test files for iteration " + iter); // create an instance of the RaidNode @@ -226,15 +228,28 @@ public class TestRaidHar extends TestCas LOG.info("doTestHar created RaidShell."); FileStatus[] listPaths = null; + int maxFilesFound = 0; // wait till file is raided while (true) { try { listPaths = fileSys.listStatus(destPath); int count = 0; + int filesFound = 0; if (listPaths != null) { for (FileStatus s : listPaths) { LOG.info("doTestHar found path " + s.getPath()); + + if (!s.isDir()) + filesFound++; + if (filesFound > maxFilesFound) + maxFilesFound = filesFound; + if (s.getPath().toString().endsWith(".har")) { + // If a HAR directory is found, ensure that we have seen + // 10 parity files. We have to keep track of the max # of + // files since some parity files might get deleted by the + // purge thread. + assertEquals(10, maxFilesFound); count++; } } @@ -278,8 +293,6 @@ public class TestRaidHar extends TestCas } finally { shell.close(); if (cnode != null) { cnode.stop(); cnode.join(); } - LOG.info("doTestHar delete file " + file1); - fileSys.delete(file1, true); } LOG.info("doTestHar completed:" + " blockSize=" + blockSize + " stripeLength=" + stripeLength);