Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 26465 invoked from network); 29 Oct 2010 01:24:58 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 29 Oct 2010 01:24:58 -0000 Received: (qmail 3234 invoked by uid 500); 29 Oct 2010 01:24:58 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 3194 invoked by uid 500); 29 Oct 2010 01:24:57 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 3186 invoked by uid 99); 29 Oct 2010 01:24:57 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 Oct 2010 01:24:57 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 Oct 2010 01:24:52 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id B3BF123888FE; Fri, 29 Oct 2010 01:23:54 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1028580 - in /hadoop/mapreduce/trunk: ./ src/contrib/raid/src/java/org/apache/hadoop/hdfs/ src/contrib/raid/src/java/org/apache/hadoop/raid/ src/contrib/raid/src/test/org/apache/hadoop/raid/ Date: Fri, 29 Oct 2010 01:23:54 -0000 To: mapreduce-commits@hadoop.apache.org From: schen@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20101029012354.B3BF123888FE@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: schen Date: Fri Oct 29 01:23:54 2010 New Revision: 1028580 URL: http://svn.apache.org/viewvc?rev=1028580&view=rev Log: RaidNode periodically fixes corrupt blocks. (Ramkumar Vadali via schen) Added: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java Modified: hadoop/mapreduce/trunk/CHANGES.txt hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/RaidDFSUtil.java hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/HarIndex.java hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java Modified: hadoop/mapreduce/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1028580&r1=1028579&r2=1028580&view=diff ============================================================================== --- hadoop/mapreduce/trunk/CHANGES.txt (original) +++ hadoop/mapreduce/trunk/CHANGES.txt Fri Oct 29 01:23:54 2010 @@ -356,6 +356,9 @@ Trunk (unreleased changes) MAPREDUCE-2146. Raid does not affect access time of a source file. (Ramkumar Vadali via dhruba) + MAPREDUCE-2150. RaidNode periodically fixes corrupt blocks. (Ramkumar Vadali via + schen) + Release 0.21.1 - Unreleased NEW FEATURES Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/RaidDFSUtil.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/RaidDFSUtil.java?rev=1028580&r1=1028579&r2=1028580&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/RaidDFSUtil.java (original) +++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/RaidDFSUtil.java Fri Oct 29 01:23:54 2010 @@ -19,11 +19,21 @@ package org.apache.hadoop.hdfs; import java.io.IOException; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.PrintStream; import java.util.LinkedList; import java.util.List; +import java.util.HashSet; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.tools.DFSck; +import org.apache.hadoop.util.ToolRunner; public abstract class RaidDFSUtil { /** @@ -49,4 +59,30 @@ public abstract class RaidDFSUtil { throws IOException { return dfs.getClient().namenode.getBlockLocations(path, offset, length); } + + public static String[] getCorruptFiles(Configuration conf) + throws IOException { + ByteArrayOutputStream baseOut = new ByteArrayOutputStream(); + PrintStream out = new PrintStream(baseOut, true); + DFSck fsck = new DFSck(conf, out); + String[] args = new String[]{"-list-corruptfileblocks"}; + try { + ToolRunner.run(fsck, args); + } catch (Exception e) { + throw new IOException("DFSck.run exception ", e); + } + byte[] output = baseOut.toByteArray(); + BufferedReader in = new BufferedReader(new InputStreamReader( + new ByteArrayInputStream(output))); + String line; + Set corruptFiles = new HashSet(); + while ((line = in.readLine()) != null) { + // The interesting lines are of the form: blkidpath + int separatorPos = line.indexOf('\t'); + if (separatorPos != -1) { + corruptFiles.add(line.substring(separatorPos + 1)); + } + } + return corruptFiles.toArray(new String[corruptFiles.size()]); + } } Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java?rev=1028580&r1=1028579&r2=1028580&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java (original) +++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java Fri Oct 29 01:23:54 2010 @@ -79,7 +79,7 @@ import org.apache.hadoop.raid.RaidUtils; * and figures out the location of the bad block by reading through * the corrupt file. */ -public class BlockFixer { +public class BlockFixer implements Runnable { public static final Log LOG = LogFactory.getLog( "org.apache.hadoop.raid.BlockFixer"); private java.util.HashMap history; @@ -90,6 +90,8 @@ public class BlockFixer { private XOREncoder xorEncoder; private XORDecoder xorDecoder; + boolean running = true; + public BlockFixer(Configuration conf) throws IOException { this.conf = conf; history = new java.util.HashMap(); @@ -101,10 +103,56 @@ public class BlockFixer { xorDecoder = new XORDecoder(conf, stripeLength); } + public void run() { + while (running) { + try { + LOG.info("BlockFixer continuing to run..."); + doFix(); + } catch (Exception e) { + LOG.error(StringUtils.stringifyException(e)); + } catch (Error err) { + LOG.error("Exiting after encountering " + + StringUtils.stringifyException(err)); + throw err; + } + } + } + public long filesFixed() { return numFilesFixed; } + void doFix() throws InterruptedException, IOException { + while (running) { + // Sleep before proceeding to fix files. + Thread.sleep(blockFixInterval); + + // Purge history older than the history interval. + purgeHistory(); + + List corruptFiles = getCorruptFiles(); + if (corruptFiles.isEmpty()) { + // If there are no corrupt files, retry after some time. + continue; + } + LOG.info("Found " + corruptFiles.size() + " corrupt files."); + + sortCorruptFiles(corruptFiles); + + for (Path srcPath: corruptFiles) { + if (!running) break; + try { + fixFile(srcPath); + } catch (IOException ie) { + LOG.error("Hit error while processing " + srcPath + + ": " + StringUtils.stringifyException(ie)); + // Do nothing, move on to the next file. + } + } + } + } + + void fixFile(Path srcPath) throws IOException { if (RaidNode.isParityHarPartFile(srcPath)) { processCorruptParityHarPartFile(srcPath); @@ -169,14 +217,11 @@ public class BlockFixer { List getCorruptFiles() throws IOException { DistributedFileSystem dfs = getDFS(new Path("/")); - // TODO: need an RPC here. - // FileStatus[] files = dfs.getClient().namenode.getCorruptFiles(); - FileStatus[] files = new FileStatus[0]; + String[] nnCorruptFiles = RaidDFSUtil.getCorruptFiles(conf); List corruptFiles = new LinkedList(); - for (FileStatus f: files) { - Path p = f.getPath(); - if (!history.containsKey(p.toString())) { - corruptFiles.add(p); + for (String file: nnCorruptFiles) { + if (!history.containsKey(file)) { + corruptFiles.add(new Path(file)); } } RaidUtils.filterTrash(conf, corruptFiles); Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/HarIndex.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/HarIndex.java?rev=1028580&r1=1028579&r2=1028580&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/HarIndex.java (original) +++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/HarIndex.java Fri Oct 29 01:23:54 2010 @@ -102,7 +102,7 @@ public class HarIndex { long startIndex = Long.parseLong(splits[3]); long length = Long.parseLong(splits[4]); String[] newsplits = URLDecoder.decode(splits[5],"UTF-8").split(" "); - if (newsplits != null && newsplits.length >= 5) { + if (newsplits != null && newsplits.length >= 4) { long mtime = Long.parseLong(newsplits[0]); IndexEntry entry = new IndexEntry( name, startIndex, length, mtime, partName); Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java?rev=1028580&r1=1028579&r2=1028580&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java (original) +++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java Fri Oct 29 01:23:54 2010 @@ -110,6 +110,10 @@ public class RaidNode implements RaidPro /** Deamon thread to har raid directories */ Daemon harThread = null; + /** Daemon thread to fix corrupt files */ + BlockFixer blockFixer = null; + Daemon blockFixerThread = null; + /** Daemon thread to monitor distributed raid job progress */ JobMonitor jobMonitor = null; Daemon jobMonitorThread = null; @@ -207,6 +211,7 @@ public class RaidNode implements RaidPro try { if (server != null) server.join(); if (triggerThread != null) triggerThread.join(); + if (blockFixerThread != null) blockFixerThread.join(); if (jobMonitorThread != null) jobMonitorThread.join(); if (purgeThread != null) purgeThread.join(); } catch (InterruptedException ie) { @@ -225,6 +230,8 @@ public class RaidNode implements RaidPro running = false; if (server != null) server.stop(); if (triggerThread != null) triggerThread.interrupt(); + if (blockFixer != null) blockFixer.running = false; + if (blockFixerThread != null) blockFixerThread.interrupt(); if (jobMonitor != null) jobMonitor.running = false; if (jobMonitorThread != null) jobMonitorThread.interrupt(); if (purgeThread != null) purgeThread.interrupt(); @@ -269,6 +276,10 @@ public class RaidNode implements RaidPro running = true; this.server.start(); // start RPC server + this.blockFixer = new BlockFixer(conf); + this.blockFixerThread = new Daemon(this.blockFixer); + this.blockFixerThread.start(); + this.jobMonitor = new JobMonitor(conf); this.jobMonitorThread = new Daemon(this.jobMonitor); this.jobMonitorThread.start(); @@ -1074,29 +1085,33 @@ public class RaidNode implements RaidPro if ( shouldHar ) { LOG.info("Archiving " + dest.getPath() + " to " + tmpHarPath ); - singleHar(destFs, dest, tmpHarPath); + singleHar(info, destFs, dest, tmpHarPath); } } - private void singleHar(FileSystem destFs, FileStatus dest, String tmpHarPath) throws IOException { - + private void singleHar(PolicyInfo info, FileSystem destFs, FileStatus dest, + String tmpHarPath) throws IOException { + Random rand = new Random(); Path root = new Path("/"); Path qualifiedPath = dest.getPath().makeQualified(destFs); String harFileDst = qualifiedPath.getName() + HAR_SUFFIX; String harFileSrc = qualifiedPath.getName() + "-" + rand.nextLong() + "-" + HAR_SUFFIX; + short metaReplication = + (short) Integer.parseInt(info.getProperty("metaReplication")); // HadoopArchives.HAR_PARTFILE_LABEL is private, so hard-coding the label. conf.setLong("har.partfile.size", configMgr.getHarPartfileSize()); HadoopArchives har = new HadoopArchives(conf); - String[] args = new String[6]; - args[0] = "-archiveName"; - args[1] = harFileSrc; - args[2] = "-p"; - args[3] = root.makeQualified(destFs).toString(); - args[4] = qualifiedPath.toUri().getPath().substring(1); - args[5] = tmpHarPath.toString(); + String[] args = new String[7]; + args[0] = "-Ddfs.replication=" + metaReplication; + args[1] = "-archiveName"; + args[2] = harFileSrc; + args[3] = "-p"; + args[4] = root.makeQualified(destFs).toString(); + args[5] = qualifiedPath.toUri().getPath().substring(1); + args[6] = tmpHarPath.toString(); int ret = 0; try { ret = ToolRunner.run(har, args); Added: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java?rev=1028580&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java (added) +++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java Fri Oct 29 01:23:54 2010 @@ -0,0 +1,541 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.raid; + +import java.io.File; +import java.io.FileWriter; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.PrintWriter; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.GregorianCalendar; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import java.util.Random; +import java.util.zip.CRC32; + +import junit.framework.TestCase; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FilterFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.DistributedRaidFileSystem; +import org.apache.hadoop.hdfs.RaidDFSUtil; +import org.apache.hadoop.hdfs.TestDatanodeBlockScanner; +import org.apache.hadoop.hdfs.TestRaidDfs; +import org.apache.hadoop.raid.RaidNode; +import org.apache.hadoop.raid.RaidUtils; + + +public class TestBlockFixer extends TestCase { + final static Log LOG = LogFactory.getLog( + "org.apache.hadoop.raid.TestBlockFixer"); + final static String TEST_DIR = new File(System.getProperty("test.build.data", + "build/contrib/raid/test/data")).getAbsolutePath(); + final static String CONFIG_FILE = new File(TEST_DIR, + "test-raid.xml").getAbsolutePath(); + final static long RELOAD_INTERVAL = 1000; + final static int NUM_DATANODES = 3; + Configuration conf; + String namenode = null; + MiniDFSCluster dfs = null; + String hftp = null; + MiniMRCluster mr = null; + FileSystem fileSys = null; + RaidNode cnode = null; + String jobTrackerName = null; + Random rand = new Random(); + + /** + * Test the filtering of trash files from the list of corrupt files. + */ + public void testTrashFilter() { + List files = new LinkedList(); + // Paths that do not match the trash pattern. + Path p1 = new Path("/user/raid/raidtest/f1"); + Path p2 = new Path("/user/.Trash/"); + // Paths that match the trash pattern. + Path p3 = new Path("/user/raid/.Trash/raidtest/f1"); + Path p4 = new Path("/user/raid/.Trash/"); + files.add(p1); + files.add(p3); + files.add(p4); + files.add(p2); + + Configuration conf = new Configuration(); + RaidUtils.filterTrash(conf, files); + + assertEquals(2, files.size()); + for (Path p: files) { + assertTrue(p == p1 || p == p2); + } + } + + /** + * Create a file with three stripes, corrupt a block each in two stripes, + * and wait for the the file to be fixed. + */ + public void testBlockFix() throws Exception { + LOG.info("Test testBlockFix started."); + long blockSize = 8192L; + int stripeLength = 3; + mySetup(stripeLength, -1); + Path file1 = new Path("/user/dhruba/raidtest/file1"); + Path destPath = new Path("/destraid/user/dhruba/raidtest"); + long crc1 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1, + 1, 7, blockSize); + long file1Len = fileSys.getFileStatus(file1).getLen(); + LOG.info("Test testBlockFix created test files"); + + // create an instance of the RaidNode + Configuration localConf = new Configuration(conf); + localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid"); + localConf.setInt("raid.blockfix.interval", 1000); + localConf.setLong("raid.blockfix.filespertask", 2L); + + try { + cnode = RaidNode.createRaidNode(null, localConf); + TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath); + cnode.stop(); cnode.join(); + + FileStatus srcStat = fileSys.getFileStatus(file1); + DistributedFileSystem dfs = (DistributedFileSystem)fileSys; + LocatedBlocks locs = RaidDFSUtil.getBlockLocations( + dfs, file1.toUri().getPath(), 0, srcStat.getLen()); + + String[] corruptFiles = RaidDFSUtil.getCorruptFiles(conf); + assertEquals(corruptFiles.length, 0); + assertEquals(0, cnode.blockFixer.filesFixed()); + + // Corrupt blocks in two different stripes. We can fix them. + int[] corruptBlockIdxs = new int[]{0, 4, 6}; + for (int idx: corruptBlockIdxs) + corruptBlock(locs.get(idx).getBlock().getBlockName()); + reportCorruptBlocks(dfs, file1, corruptBlockIdxs, blockSize); + + corruptFiles = RaidDFSUtil.getCorruptFiles(conf); + assertEquals(corruptFiles.length, 1); + assertEquals(corruptFiles[0], file1.toUri().getPath()); + assertEquals(3, + RaidDFSUtil.corruptBlocksInFile(dfs, file1.toUri().getPath(), 0, + srcStat.getLen()).size()); + + cnode = RaidNode.createRaidNode(null, localConf); + long start = System.currentTimeMillis(); + while (cnode.blockFixer.filesFixed() < 1 && + System.currentTimeMillis() - start < 120000) { + LOG.info("Test testBlockFix waiting for files to be fixed."); + Thread.sleep(1000); + } + assertEquals(1, cnode.blockFixer.filesFixed()); + + dfs = getDFS(conf, dfs); + assertTrue(TestRaidDfs.validateFile(dfs, file1, file1Len, crc1)); + + } catch (Exception e) { + LOG.info("Test testBlockFix Exception " + e + StringUtils.stringifyException(e)); + throw e; + } finally { + myTearDown(); + } + LOG.info("Test testBlockFix completed."); + } + + /** + * Tests integrity of generated block. + * Create a file and delete a block entirely. Wait for the block to be + * regenerated. Now stop RaidNode and corrupt the generated block. + * Test that corruption in the generated block can be detected by clients. + */ + public void testGeneratedBlock() throws Exception { + LOG.info("Test testGeneratedBlock started."); + long blockSize = 8192L; + int stripeLength = 3; + mySetup(stripeLength, -1); + Path file1 = new Path("/user/dhruba/raidtest/file1"); + Path destPath = new Path("/destraid/user/dhruba/raidtest"); + long crc1 = TestRaidDfs.createTestFile(fileSys, file1, 1, 7, blockSize); + long file1Len = fileSys.getFileStatus(file1).getLen(); + LOG.info("Test testGeneratedBlock created test files"); + + // create an instance of the RaidNode + Configuration localConf = new Configuration(conf); + localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid"); + localConf.setInt("raid.blockfix.interval", 1000); + localConf.setLong("raid.blockfix.filespertask", 2L); + try { + cnode = RaidNode.createRaidNode(null, localConf); + TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath); + cnode.stop(); cnode.join(); + + FileStatus srcStat = fileSys.getFileStatus(file1); + DistributedFileSystem dfs = (DistributedFileSystem)fileSys; + LocatedBlocks locs = RaidDFSUtil.getBlockLocations( + dfs, file1.toUri().getPath(), 0, srcStat.getLen()); + + String[] corruptFiles = RaidDFSUtil.getCorruptFiles(conf); + assertEquals(corruptFiles.length, 0); + assertEquals(0, cnode.blockFixer.filesFixed()); + + corruptBlock(locs.get(0).getBlock().getBlockName()); + reportCorruptBlocks(dfs, file1, new int[]{0}, blockSize); + + corruptFiles = RaidDFSUtil.getCorruptFiles(conf); + assertEquals(corruptFiles.length, 1); + assertEquals(corruptFiles[0], file1.toUri().getPath()); + + cnode = RaidNode.createRaidNode(null, localConf); + long start = System.currentTimeMillis(); + while (cnode.blockFixer.filesFixed() < 1 && + System.currentTimeMillis() - start < 120000) { + LOG.info("Test testGeneratedBlock waiting for files to be fixed."); + Thread.sleep(1000); + } + assertEquals(1, cnode.blockFixer.filesFixed()); + + // Stop RaidNode + cnode.stop(); cnode.join(); cnode = null; + + // The block has successfully been reconstructed. + dfs = getDFS(conf, dfs); + assertTrue(TestRaidDfs.validateFile(dfs, file1, file1Len, crc1)); + + // Now corrupt the generated block. + locs = RaidDFSUtil.getBlockLocations( + dfs, file1.toUri().getPath(), 0, srcStat.getLen()); + corruptBlock(locs.get(0).getBlock().getBlockName()); + reportCorruptBlocks(dfs, file1, new int[]{0}, blockSize); + + // This should fail. + boolean caughtChecksumException = false; + try { + Thread.sleep(5*1000); + } catch (InterruptedException ignore) { + } + try { + TestRaidDfs.validateFile(dfs, file1, file1Len, crc1); + } catch (org.apache.hadoop.fs.ChecksumException ce) { + caughtChecksumException = true; + } + assertTrue(caughtChecksumException); + } catch (Exception e) { + LOG.info("Test testGeneratedBlock Exception " + e + StringUtils.stringifyException(e)); + throw e; + } finally { + myTearDown(); + } + LOG.info("Test testGeneratedBlock completed."); + } + + /** + * Corrupt a parity file and wait for it to get fixed. + */ + public void testParityBlockFix() throws Exception { + LOG.info("Test testParityBlockFix started."); + long blockSize = 8192L; + int stripeLength = 3; + mySetup(stripeLength, -1); + Path file1 = new Path("/user/dhruba/raidtest/file1"); + Path destPath = new Path("/destraid/user/dhruba/raidtest"); + Path parityFile = new Path("/destraid/user/dhruba/raidtest/file1"); + TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1, + 1, 7, blockSize); + long file1Len = fileSys.getFileStatus(file1).getLen(); + LOG.info("Test testParityBlockFix created test files"); + + // create an instance of the RaidNode + Configuration localConf = new Configuration(conf); + localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid"); + localConf.setInt("raid.blockfix.interval", 1000); + localConf.setLong("raid.blockfix.filespertask", 2L); + + try { + cnode = RaidNode.createRaidNode(null, localConf); + TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath); + cnode.stop(); cnode.join(); + + long parityCRC = getCRC(fileSys, parityFile); + + FileStatus parityStat = fileSys.getFileStatus(parityFile); + DistributedFileSystem dfs = (DistributedFileSystem)fileSys; + LocatedBlocks locs = RaidDFSUtil.getBlockLocations( + dfs, parityFile.toUri().getPath(), 0, parityStat.getLen()); + + String[] corruptFiles = RaidDFSUtil.getCorruptFiles(conf); + assertEquals(corruptFiles.length, 0); + assertEquals(0, cnode.blockFixer.filesFixed()); + + // Corrupt parity blocks for different stripes. + int[] corruptBlockIdxs = new int[]{0, 1, 2}; + for (int idx: corruptBlockIdxs) + corruptBlock(locs.get(idx).getBlock().getBlockName()); + reportCorruptBlocks(dfs, parityFile, corruptBlockIdxs, blockSize); + + corruptFiles = RaidDFSUtil.getCorruptFiles(conf); + assertEquals(corruptFiles.length, 1); + assertEquals(corruptFiles[0], parityFile.toUri().getPath()); + + cnode = RaidNode.createRaidNode(null, localConf); + long start = System.currentTimeMillis(); + while (cnode.blockFixer.filesFixed() < 1 && + System.currentTimeMillis() - start < 120000) { + LOG.info("Test testParityBlockFix waiting for files to be fixed."); + Thread.sleep(1000); + } + assertEquals(1, cnode.blockFixer.filesFixed()); + + long checkCRC = getCRC(fileSys, parityFile); + + assertEquals(parityCRC, checkCRC); + + } catch (Exception e) { + LOG.info("Test testParityBlockFix Exception " + e + StringUtils.stringifyException(e)); + throw e; + } finally { + myTearDown(); + } + LOG.info("Test testParityBlockFix completed."); + } + + public void testParityHarBlockFix() throws Exception { + LOG.info("Test testParityHarBlockFix started."); + long blockSize = 8192L; + int stripeLength = 3; + mySetup(stripeLength, 0); // Time before har = 0 days. + Path file1 = new Path("/user/dhruba/raidtest/file1"); + Path destPath = new Path("/destraid/user/dhruba/raidtest"); + // Parity file will have 7 blocks. + TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1, + 1, 20, blockSize); + long file1Len = fileSys.getFileStatus(file1).getLen(); + LOG.info("Test testParityHarBlockFix created test files"); + + // create an instance of the RaidNode + // HAR block size = 2 * src block size = 2 * parity block size. + Configuration localConf = new Configuration(conf); + localConf.setLong("har.block.size", blockSize * 2); + localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid"); + localConf.setInt("raid.blockfix.interval", 1000); + localConf.setLong("raid.blockfix.filespertask", 2L); + + try { + cnode = RaidNode.createRaidNode(null, localConf); + Path harDirectory = + new Path("/destraid/user/dhruba/raidtest/raidtest" + RaidNode.HAR_SUFFIX); + long start = System.currentTimeMillis(); + while (System.currentTimeMillis() - start < 1000 * 120) { + if (fileSys.exists(harDirectory)) { + break; + } + LOG.info("Test testParityHarBlockFix waiting for har"); + Thread.sleep(1000); + } + + Path partFile = new Path(harDirectory, "part-0"); + long partCRC = getCRC(fileSys, partFile); + FileStatus partStat = fileSys.getFileStatus(partFile); + DistributedFileSystem dfs = (DistributedFileSystem)fileSys; + LocatedBlocks locs = RaidDFSUtil.getBlockLocations( + dfs, partFile.toUri().getPath(), 0, partStat.getLen()); + // 7 parity blocks => 4 har blocks. + assertEquals(4, locs.getLocatedBlocks().size()); + cnode.stop(); cnode.join(); + + String[] corruptFiles = RaidDFSUtil.getCorruptFiles(conf); + assertEquals(corruptFiles.length, 0); + assertEquals(0, cnode.blockFixer.filesFixed()); + + // Corrupt parity blocks for different stripes. + int[] corruptBlockIdxs = new int[]{0, 3}; + for (int idx: corruptBlockIdxs) + corruptBlock(locs.get(idx).getBlock().getBlockName()); + reportCorruptBlocks(dfs, partFile, corruptBlockIdxs, + partStat.getBlockSize()); + + corruptFiles = RaidDFSUtil.getCorruptFiles(conf); + assertEquals(corruptFiles.length, 1); + assertEquals(corruptFiles[0], partFile.toUri().getPath()); + + cnode = RaidNode.createRaidNode(null, localConf); + start = System.currentTimeMillis(); + while (cnode.blockFixer.filesFixed() < 1 && + System.currentTimeMillis() - start < 120000) { + LOG.info("Test testParityHarBlockFix waiting for files to be fixed."); + Thread.sleep(1000); + } + assertEquals(1, cnode.blockFixer.filesFixed()); + + long checkCRC = getCRC(fileSys, partFile); + + assertEquals(partCRC, checkCRC); + } catch (Exception e) { + LOG.info("Test testParityHarBlockFix Exception " + e + StringUtils.stringifyException(e)); + throw e; + } finally { + myTearDown(); + } + LOG.info("Test testParityHarBlockFix completed."); + } + + private static DistributedFileSystem getDFS( + Configuration conf, FileSystem dfs) throws IOException { + Configuration clientConf = new Configuration(conf); + clientConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); + clientConf.setBoolean("fs.hdfs.impl.disable.cache", true); + URI dfsUri = dfs.getUri(); + FileSystem.closeAll(); + return (DistributedFileSystem) FileSystem.get(dfsUri, clientConf); + } + + private void mySetup(int stripeLength, int timeBeforeHar) throws Exception { + + new File(TEST_DIR).mkdirs(); // Make sure data directory exists + conf = new Configuration(); + + conf.set("raid.config.file", CONFIG_FILE); + conf.setBoolean("raid.config.reload", true); + conf.setLong("raid.config.reload.interval", RELOAD_INTERVAL); + + // scan all policies once every 5 second + conf.setLong("raid.policy.rescan.interval", 5000); + + // make all deletions not go through Trash + conf.set("fs.shell.delete.classname", "org.apache.hadoop.hdfs.DFSClient"); + + // do not use map-reduce cluster for Raiding + conf.setBoolean("fs.raidnode.local", true); + conf.set("raid.server.address", "localhost:0"); + conf.setInt("hdfs.raid.stripeLength", stripeLength); + conf.set("hdfs.raid.locs", "/destraid"); + + dfs = new MiniDFSCluster(conf, NUM_DATANODES, true, null); + dfs.waitActive(); + fileSys = dfs.getFileSystem(); + namenode = fileSys.getUri().toString(); + + FileSystem.setDefaultUri(conf, namenode); + mr = new MiniMRCluster(4, namenode, 3); + jobTrackerName = "localhost:" + mr.getJobTrackerPort(); + hftp = "hftp://localhost.localdomain:" + dfs.getNameNodePort(); + + FileSystem.setDefaultUri(conf, namenode); + conf.set("mapred.job.tracker", jobTrackerName); + + FileWriter fileWriter = new FileWriter(CONFIG_FILE); + fileWriter.write("\n"); + String str = " " + + " " + + " " + + "xor " + + " /destraid " + + " " + + "targetReplication " + + "1 " + + "after RAIDing, decrease the replication factor of a file to this value." + + " " + + " " + + " " + + "metaReplication " + + "1 " + + " replication factor of parity file" + + " " + + " " + + " " + + "modTimePeriod " + + "2000 " + + " time (milliseconds) after a file is modified to make it " + + "a candidate for RAIDing " + + " " + + " "; + if (timeBeforeHar >= 0) { + str += + " " + + "time_before_har " + + "" + timeBeforeHar + " " + + " amount of time waited before har'ing parity files" + + " " + + " "; + } + + str += + "" + + "" + + ""; + fileWriter.write(str); + fileWriter.close(); + } + + private void myTearDown() throws Exception { + if (cnode != null) { cnode.stop(); cnode.join(); } + if (mr != null) { mr.shutdown(); } + if (dfs != null) { dfs.shutdown(); } + } + + private long getCRC(FileSystem fs, Path p) throws IOException { + CRC32 crc = new CRC32(); + FSDataInputStream stm = fs.open(p); + for (int b = 0; b > 0; b = stm.read()) { + crc.update(b); + } + stm.close(); + return crc.getValue(); + } + + void corruptBlock(String blockName) throws IOException { + boolean corrupted = false; + for (int i = 0; i < NUM_DATANODES; i++) { + corrupted |= TestDatanodeBlockScanner.corruptReplica(blockName, i); + } + assertTrue(corrupted); + } + + void reportCorruptBlocks(FileSystem fs, Path file, int[] idxs, + long blockSize) throws IOException { + + FSDataInputStream in = fs.open(file); + for (int idx: idxs) { + long offset = idx * blockSize; + LOG.info("Reporting corrupt block " + file + ":" + offset); + in.seek(offset); + try { + in.readFully(new byte[(int)blockSize]); + fail("Expected exception not thrown for " + file + ":" + offset); + } catch (org.apache.hadoop.fs.ChecksumException e) { + } + } + } +} +