Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 90768 invoked from network); 30 Nov 2010 06:25:40 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 30 Nov 2010 06:25:40 -0000 Received: (qmail 58894 invoked by uid 500); 30 Nov 2010 06:25:40 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 58790 invoked by uid 500); 30 Nov 2010 06:25:39 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 58774 invoked by uid 99); 30 Nov 2010 06:25:39 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Nov 2010 06:25:39 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED,T_FRT_PROFILE2 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Nov 2010 06:25:30 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 79437238890D; Tue, 30 Nov 2010 06:23:56 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1040417 [1/2] - in /hadoop/mapreduce/trunk: ./ src/contrib/raid/src/java/org/apache/hadoop/hdfs/ src/contrib/raid/src/java/org/apache/hadoop/raid/ src/contrib/raid/src/test/org/apache/hadoop/raid/ Date: Tue, 30 Nov 2010 06:23:56 -0000 To: mapreduce-commits@hadoop.apache.org From: dhruba@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20101130062356.79437238890D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: dhruba Date: Tue Nov 30 06:23:55 2010 New Revision: 1040417 URL: http://svn.apache.org/viewvc?rev=1040417&view=rev Log: MAPREDUCE-2155. RaidNode should optionally use the mapreduce jobs to fix missing blocks. (Patrick Kling via dhruba) Added: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistBlockFixer.java hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/LocalBlockFixer.java Modified: hadoop/mapreduce/trunk/CHANGES.txt hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/RaidDFSUtil.java hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonDecoder.java Modified: hadoop/mapreduce/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1040417&r1=1040416&r2=1040417&view=diff ============================================================================== --- hadoop/mapreduce/trunk/CHANGES.txt (original) +++ hadoop/mapreduce/trunk/CHANGES.txt Tue Nov 30 06:23:55 2010 @@ -22,6 +22,9 @@ Trunk (unreleased changes) MAPREDUCE-1752. Implement getFileBlockLocations in HarFilesystem. (Patrick Kling via dhruba) + MAPREDUCE-2155. RaidNode should optionally use the mapreduce jobs to + fix missing blocks. (Patrick Kling via dhruba) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/RaidDFSUtil.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/RaidDFSUtil.java?rev=1040417&r1=1040416&r2=1040417&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/RaidDFSUtil.java (original) +++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/RaidDFSUtil.java Tue Nov 30 06:23:55 2010 @@ -30,6 +30,7 @@ import java.util.HashSet; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.tools.DFSck; @@ -60,29 +61,24 @@ public abstract class RaidDFSUtil { return dfs.getClient().namenode.getBlockLocations(path, offset, length); } - public static String[] getCorruptFiles(Configuration conf) + /** + * Make successive calls to listCorruptFiles to obtain all + * corrupt files. + */ + public static String[] getCorruptFiles(DistributedFileSystem dfs) throws IOException { - ByteArrayOutputStream baseOut = new ByteArrayOutputStream(); - PrintStream out = new PrintStream(baseOut, true); - DFSck fsck = new DFSck(conf, out); - String[] args = new String[]{"-list-corruptfileblocks"}; - try { - ToolRunner.run(fsck, args); - } catch (Exception e) { - throw new IOException("DFSck.run exception ", e); - } - byte[] output = baseOut.toByteArray(); - BufferedReader in = new BufferedReader(new InputStreamReader( - new ByteArrayInputStream(output))); - String line; Set corruptFiles = new HashSet(); - while ((line = in.readLine()) != null) { - // The interesting lines are of the form: blkidpath - int separatorPos = line.indexOf('\t'); - if (separatorPos != -1) { - corruptFiles.add(line.substring(separatorPos + 1)); + + String cookie = null; + for (CorruptFileBlocks fbs = dfs.listCorruptFileBlocks("/", cookie); + fbs.getFiles().length > 0; + fbs = dfs.listCorruptFileBlocks("/", cookie)) { + for (String path : fbs.getFiles()) { + corruptFiles.add(path); } + cookie = fbs.getCookie(); } + return corruptFiles.toArray(new String[corruptFiles.size()]); } } Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java?rev=1040417&r1=1040416&r2=1040417&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java (original) +++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java Tue Nov 30 06:23:55 2010 @@ -42,6 +42,8 @@ import java.util.Random; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.channels.SocketChannel; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -64,9 +66,9 @@ import org.apache.hadoop.fs.ChecksumExce import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hdfs.BlockMissingException; import org.apache.hadoop.hdfs.RaidDFSUtil; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Progressable; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.raid.RaidNode; @@ -77,6 +79,10 @@ import org.apache.hadoop.raid.protocol.P /** * contains the core functionality of the block fixer * + * configuration options: + * raid.blockfix.classname - the class name of the block fixer + * implementation to use + * * raid.blockfix.interval - interval between checks for corrupt files * * raid.blockfix.history.interval - interval before fixing same file again @@ -85,10 +91,9 @@ import org.apache.hadoop.raid.protocol.P * * raid.blockfix.write.timeout - write time out */ -public class BlockFixer extends Configured implements Runnable { - public static final Log LOG = LogFactory.getLog( - "org.apache.hadoop.raid.BlockFixer"); +public abstract class BlockFixer extends Configured implements Runnable { + public static final String BLOCKFIX_CLASSNAME = "raid.blockfix.classname"; public static final String BLOCKFIX_INTERVAL = "raid.blockfix.interval"; public static final String BLOCKFIX_HISTORY_INTERVAL = "raid.blockfix.history.interval"; @@ -101,661 +106,737 @@ public class BlockFixer extends Configur public static final long DEFAULT_BLOCKFIX_HISTORY_INTERVAL = 60 * 60 * 1000; // 60 mins - private java.util.HashMap history; + public static BlockFixer createBlockFixer(Configuration conf) + throws ClassNotFoundException { + try { + // default to distributed block fixer + Class blockFixerClass = + conf.getClass(BLOCKFIX_CLASSNAME, DistBlockFixer.class); + if (!BlockFixer.class.isAssignableFrom(blockFixerClass)) { + throw new ClassNotFoundException("not an implementation of blockfixer"); + } + Constructor constructor = + blockFixerClass.getConstructor(new Class[] {Configuration.class} ); + return (BlockFixer) constructor.newInstance(conf); + } catch (NoSuchMethodException e) { + throw new ClassNotFoundException("cannot construct blockfixer", e); + } catch (InstantiationException e) { + throw new ClassNotFoundException("cannot construct blockfixer", e); + } catch (IllegalAccessException e) { + throw new ClassNotFoundException("cannot construct blockfixer", e); + } catch (InvocationTargetException e) { + throw new ClassNotFoundException("cannot construct blockfixer", e); + } + } + private long numFilesFixed = 0; - private String xorPrefix; - private String rsPrefix; - private Encoder xorEncoder; - private Decoder xorDecoder; - private Encoder rsEncoder; - private Decoder rsDecoder; + + public volatile boolean running = true; // interval between checks for corrupt files - protected long blockFixInterval = DEFAULT_BLOCKFIX_INTERVAL; + protected long blockFixInterval; // interval before fixing same file again - protected long historyInterval = DEFAULT_BLOCKFIX_HISTORY_INTERVAL; - - public volatile boolean running = true; + protected long historyInterval; - - public BlockFixer(Configuration conf) throws IOException { + public BlockFixer(Configuration conf) { super(conf); - history = new java.util.HashMap(); - blockFixInterval = getConf().getInt(BLOCKFIX_INTERVAL, - (int) blockFixInterval); - xorPrefix = RaidNode.xorDestinationPath(getConf()).toUri().getPath(); - if (!xorPrefix.endsWith(Path.SEPARATOR)) { - xorPrefix += Path.SEPARATOR; - } - int stripeLength = RaidNode.getStripeLength(getConf()); - xorEncoder = new XOREncoder(getConf(), stripeLength); - xorDecoder = new XORDecoder(getConf(), stripeLength); - rsPrefix = RaidNode.rsDestinationPath(getConf()).toUri().getPath(); - if (!rsPrefix.endsWith(Path.SEPARATOR)) { - rsPrefix += Path.SEPARATOR; - } - int parityLength = RaidNode.rsParityLength(getConf()); - rsEncoder = new ReedSolomonEncoder(getConf(), stripeLength, parityLength); - rsDecoder = new ReedSolomonDecoder(getConf(), stripeLength, parityLength); - } + blockFixInterval = + getConf().getLong(BLOCKFIX_INTERVAL, DEFAULT_BLOCKFIX_INTERVAL); + historyInterval = + getConf().getLong(BLOCKFIX_HISTORY_INTERVAL, + DEFAULT_BLOCKFIX_HISTORY_INTERVAL); - public void run() { - while (running) { - try { - LOG.info("BlockFixer continuing to run..."); - doFix(); - } catch (Exception e) { - LOG.error(StringUtils.stringifyException(e)); - } catch (Error err) { - LOG.error("Exiting after encountering " + - StringUtils.stringifyException(err)); - throw err; - } - } } - public long filesFixed() { + @Override + public abstract void run(); + + /** + * returns the number of files that have been fixed by this block fixer + */ + public synchronized long filesFixed() { return numFilesFixed; } - void doFix() throws InterruptedException, IOException { - while (running) { - // Sleep before proceeding to fix files. - Thread.sleep(blockFixInterval); + /** + * increments the number of files that have been fixed by this block fixer + */ + protected synchronized void incrFilesFixed() { + numFilesFixed++; + } - // Purge history older than the history interval. - purgeHistory(); + /** + * increments the number of files that have been fixed by this block fixer + */ + protected synchronized void incrFilesFixed(long incr) { + if (incr < 0) { + throw new IllegalArgumentException("cannot increment by negative value " + + incr); + } + + numFilesFixed += incr; + } - List corruptFiles = getCorruptFiles(); - if (corruptFiles.isEmpty()) { - // If there are no corrupt files, retry after some time. - continue; + static boolean isSourceFile(Path p, String[] destPrefixes) { + String pathStr = p.toUri().getPath(); + for (String destPrefix: destPrefixes) { + if (pathStr.startsWith(destPrefix)) { + return false; } - LOG.info("Found " + corruptFiles.size() + " corrupt files."); - - sortCorruptFiles(corruptFiles); + } + return true; + } - for (Path srcPath: corruptFiles) { - if (!running) break; - try { - fixFile(srcPath); - } catch (IOException ie) { - LOG.error("Hit error while processing " + srcPath + - ": " + StringUtils.stringifyException(ie)); - // Do nothing, move on to the next file. - } + void filterUnfixableSourceFiles(Iterator it) throws IOException { + String xorPrefix = RaidNode.xorDestinationPath(getConf()).toUri().getPath(); + if (!xorPrefix.endsWith(Path.SEPARATOR)) { + xorPrefix += Path.SEPARATOR; + } + String rsPrefix = RaidNode.rsDestinationPath(getConf()).toUri().getPath(); + if (!rsPrefix.endsWith(Path.SEPARATOR)) { + rsPrefix += Path.SEPARATOR; + } + String[] destPrefixes = new String[]{xorPrefix, rsPrefix}; + while (it.hasNext()) { + Path p = it.next(); + if (isSourceFile(p, destPrefixes) && + RaidNode.xorParityForSource(p, getConf()) == null && + RaidNode.rsParityForSource(p, getConf()) == null) { + it.remove(); } } } + /** + * this class implements the actual fixing functionality + * we keep this in a separate class so that + * the distributed block fixer can use it + */ + static class BlockFixerHelper extends Configured { - void fixFile(Path srcPath) throws IOException { + public static final Log LOG = LogFactory.getLog(BlockFixer. + BlockFixerHelper.class); - if (RaidNode.isParityHarPartFile(srcPath)) { - processCorruptParityHarPartFile(srcPath); - return; - } + private String xorPrefix; + private String rsPrefix; + private XOREncoder xorEncoder; + private XORDecoder xorDecoder; + private ReedSolomonEncoder rsEncoder; + private ReedSolomonDecoder rsDecoder; - // The corrupted file is a XOR parity file - if (isXorParityFile(srcPath)) { - processCorruptParityFile(srcPath, xorEncoder); - return; - } + public BlockFixerHelper(Configuration conf) throws IOException { + super(conf); + + xorPrefix = RaidNode.xorDestinationPath(getConf()).toUri().getPath(); + if (!xorPrefix.endsWith(Path.SEPARATOR)) { + xorPrefix += Path.SEPARATOR; + } + rsPrefix = RaidNode.rsDestinationPath(getConf()).toUri().getPath(); + if (!rsPrefix.endsWith(Path.SEPARATOR)) { + rsPrefix += Path.SEPARATOR; + } + int stripeLength = RaidNode.getStripeLength(getConf()); + xorEncoder = new XOREncoder(getConf(), stripeLength); + xorDecoder = new XORDecoder(getConf(), stripeLength); + int parityLength = RaidNode.rsParityLength(getConf()); + rsEncoder = new ReedSolomonEncoder(getConf(), stripeLength, parityLength); + rsDecoder = new ReedSolomonDecoder(getConf(), stripeLength, parityLength); - // The corrupted file is a ReedSolomon parity file - if (isRsParityFile(srcPath)) { - processCorruptParityFile(srcPath, rsEncoder); - return; } - // The corrupted file is a source file - RaidNode.ParityFilePair ppair = - RaidNode.xorParityForSource(srcPath, getConf()); - Decoder decoder = null; - if (ppair != null) { - decoder = xorDecoder; - } else { - ppair = RaidNode.rsParityForSource(srcPath, getConf()); - if (ppair != null) { - decoder = rsDecoder; + /** + * checks whether file is xor parity file + */ + boolean isXorParityFile(Path p) { + String pathStr = p.toUri().getPath(); + if (pathStr.contains(RaidNode.HAR_SUFFIX)) { + return false; } + return pathStr.startsWith(xorPrefix); } - // If we have a parity file, process the file and fix it. - if (ppair != null) { - processCorruptFile(srcPath, ppair, decoder); + /** + * checks whether file is rs parity file + */ + boolean isRsParityFile(Path p) { + String pathStr = p.toUri().getPath(); + if (pathStr.contains(RaidNode.HAR_SUFFIX)) { + return false; + } + return pathStr.startsWith(rsPrefix); } - } - - /** - * We maintain history of fixed files because a fixed file may appear in - * the list of corrupt files if we loop around too quickly. - * This function removes the old items in the history so that we can - * recognize files that have actually become corrupt since being fixed. - */ - void purgeHistory() { - // Default history interval is 1 hour. - long historyInterval = getConf().getLong( - BLOCKFIX_HISTORY_INTERVAL, 3600*1000); - java.util.Date cutOff = new java.util.Date( - System.currentTimeMillis()-historyInterval); - List toRemove = new java.util.ArrayList(); - - for (String key: history.keySet()) { - java.util.Date item = history.get(key); - if (item.before(cutOff)) { - toRemove.add(key); - } - } - for (String key: toRemove) { - LOG.info("Removing " + key + " from history"); - history.remove(key); + /** + * Fix a file, do not report progess. + * + * @return true if file has been fixed, false if no fixing + * was necessary or possible. + */ + boolean fixFile(Path srcPath) throws IOException { + return fixFile(srcPath, new RaidUtils.DummyProgressable()); } - } - /** - * @return A list of corrupt files as obtained from the namenode - */ - List getCorruptFiles() throws IOException { - DistributedFileSystem dfs = getDFS(new Path("/")); + /** + * Fix a file, report progess. + * + * @return true if file has been fixed, false if no fixing + * was necessary or possible. + */ + boolean fixFile(Path srcPath, Progressable progress) throws IOException { - String[] nnCorruptFiles = RaidDFSUtil.getCorruptFiles(getConf()); - List corruptFiles = new LinkedList(); - for (String file: nnCorruptFiles) { - if (!history.containsKey(file)) { - corruptFiles.add(new Path(file)); + if (RaidNode.isParityHarPartFile(srcPath)) { + return processCorruptParityHarPartFile(srcPath, progress); } - } - RaidUtils.filterTrash(getConf(), corruptFiles); - return corruptFiles; - } - /** - * Sorts source files ahead of parity files. - */ - void sortCorruptFiles(List files) { - // TODO: We should first fix the files that lose more blocks - Comparator comp = new Comparator() { - public int compare(Path p1, Path p2) { - if (isXorParityFile(p2) || isRsParityFile(p2)) { - // If p2 is a parity file, p1 is smaller. - return -1; - } - if (isXorParityFile(p1) || isRsParityFile(p1)) { - // If p1 is a parity file, p2 is smaller. - return 1; - } - // If both are source files, they are equal. - return 0; + // The corrupted file is a XOR parity file + if (isXorParityFile(srcPath)) { + return processCorruptParityFile(srcPath, xorEncoder, progress); } - }; - Collections.sort(files, comp); - } - /** - * Reads through a corrupt source file fixing corrupt blocks on the way. - * @param srcPath Path identifying the corrupt file. - * @throws IOException - */ - void processCorruptFile(Path srcPath, RaidNode.ParityFilePair parityPair, - Decoder decoder) throws IOException { - LOG.info("Processing corrupt file " + srcPath); - - DistributedFileSystem srcFs = getDFS(srcPath); - FileStatus srcStat = srcFs.getFileStatus(srcPath); - long blockSize = srcStat.getBlockSize(); - long srcFileSize = srcStat.getLen(); - String uriPath = srcPath.toUri().getPath(); - - int numBlocksFixed = 0; - List corrupt = - RaidDFSUtil.corruptBlocksInFile(srcFs, uriPath, 0, srcFileSize); - for (LocatedBlock lb: corrupt) { - Block corruptBlock = lb.getBlock(); - long corruptOffset = lb.getStartOffset(); - - LOG.info("Found corrupt block " + corruptBlock + - ", offset " + corruptOffset); - - final long blockContentsSize = - Math.min(blockSize, srcFileSize - corruptOffset); - File localBlockFile = - File.createTempFile(corruptBlock.getBlockName(), ".tmp"); - localBlockFile.deleteOnExit(); - - try { - decoder.recoverBlockToFile(srcFs, srcPath, parityPair.getFileSystem(), - parityPair.getPath(), blockSize, corruptOffset, localBlockFile, - blockContentsSize); - - // We have a the contents of the block, send them. - DatanodeInfo datanode = chooseDatanode(lb.getLocations()); - computeMetdataAndSendFixedBlock( - datanode, localBlockFile, lb, blockContentsSize); - numBlocksFixed++; + // The corrupted file is a ReedSolomon parity file + if (isRsParityFile(srcPath)) { + return processCorruptParityFile(srcPath, rsEncoder, progress); + } - LOG.info("Adding " + srcPath + " to history"); - history.put(srcPath.toString(), new java.util.Date()); - } finally { - localBlockFile.delete(); + // The corrupted file is a source file + RaidNode.ParityFilePair ppair = + RaidNode.xorParityForSource(srcPath, getConf()); + Decoder decoder = null; + if (ppair != null) { + decoder = xorDecoder; + } else { + ppair = RaidNode.rsParityForSource(srcPath, getConf()); + if (ppair != null) { + decoder = rsDecoder; + } } - } - LOG.info("Fixed " + numBlocksFixed + " blocks in " + srcPath); - numFilesFixed++; - } - /** - * checks whether file is xor parity file - */ - boolean isXorParityFile(Path p) { - String pathStr = p.toUri().getPath(); - if (pathStr.contains(RaidNode.HAR_SUFFIX)) { + // If we have a parity file, process the file and fix it. + if (ppair != null) { + return processCorruptFile(srcPath, ppair, decoder, progress); + } + + // there was nothing to do return false; } - return pathStr.startsWith(xorPrefix); - } - /** - * checks whether file is rs parity file - */ - boolean isRsParityFile(Path p) { - String pathStr = p.toUri().getPath(); - if (pathStr.contains(RaidNode.HAR_SUFFIX)) { - return false; + /** + * Sorts source files ahead of parity files. + */ + void sortCorruptFiles(List files) { + // TODO: We should first fix the files that lose more blocks + Comparator comp = new Comparator() { + public int compare(Path p1, Path p2) { + if (isXorParityFile(p2) || isRsParityFile(p2)) { + // If p2 is a parity file, p1 is smaller. + return -1; + } + if (isXorParityFile(p1) || isRsParityFile(p1)) { + // If p1 is a parity file, p2 is smaller. + return 1; + } + // If both are source files, they are equal. + return 0; + } + }; + Collections.sort(files, comp); } - return pathStr.startsWith(rsPrefix); - } - /** - * Returns a DistributedFileSystem hosting the path supplied. - */ - protected DistributedFileSystem getDFS(Path p) throws IOException { - return (DistributedFileSystem) p.getFileSystem(getConf()); - } - - /** - * Fixes corrupt blocks in a parity file. - * This function uses the corresponding source file to regenerate parity - * file blocks. - */ - void processCorruptParityFile(Path parityPath, Encoder encoder) + /** + * Returns a DistributedFileSystem hosting the path supplied. + */ + protected DistributedFileSystem getDFS(Path p) throws IOException { + return (DistributedFileSystem) p.getFileSystem(getConf()); + } + + /** + * Reads through a corrupt source file fixing corrupt blocks on the way. + * @param srcPath Path identifying the corrupt file. + * @throws IOException + * @return true if file has been fixed, false if no fixing + * was necessary or possible. + */ + boolean processCorruptFile(Path srcPath, RaidNode.ParityFilePair parityPair, + Decoder decoder, Progressable progress) throws IOException { - LOG.info("Processing corrupt file " + parityPath); - Path srcPath = sourcePathFromParityPath(parityPath); - if (srcPath == null) { - LOG.warn("Unusable parity file " + parityPath); - return; - } - - DistributedFileSystem parityFs = getDFS(parityPath); - FileStatus parityStat = parityFs.getFileStatus(parityPath); - long blockSize = parityStat.getBlockSize(); - long parityFileSize = parityStat.getLen(); - FileStatus srcStat = getDFS(srcPath).getFileStatus(srcPath); - long srcFileSize = srcStat.getLen(); - - // Check timestamp. - if (srcStat.getModificationTime() != parityStat.getModificationTime()) { - LOG.info("Mismatching timestamp for " + srcPath + " and " + parityPath + - ", moving on..."); - return; - } - - String uriPath = parityPath.toUri().getPath(); - int numBlocksFixed = 0; - List corrupt = RaidDFSUtil.corruptBlocksInFile( - parityFs, uriPath, 0, parityFileSize); - for (LocatedBlock lb: corrupt) { - Block corruptBlock = lb.getBlock(); - long corruptOffset = lb.getStartOffset(); - - LOG.info("Found corrupt block " + corruptBlock + - ", offset " + corruptOffset); - - File localBlockFile = - File.createTempFile(corruptBlock.getBlockName(), ".tmp"); - localBlockFile.deleteOnExit(); - - try { - encoder.recoverParityBlockToFile(parityFs, srcPath, srcFileSize, - blockSize, parityPath, corruptOffset, localBlockFile); - // We have a the contents of the block, send them. - DatanodeInfo datanode = chooseDatanode(lb.getLocations()); - computeMetdataAndSendFixedBlock( - datanode, localBlockFile, lb, blockSize); - - numBlocksFixed++; - LOG.info("Adding " + parityPath + " to history"); - history.put(parityPath.toString(), new java.util.Date()); - } finally { - localBlockFile.delete(); + LOG.info("Processing corrupt file " + srcPath); + + DistributedFileSystem srcFs = getDFS(srcPath); + FileStatus srcStat = srcFs.getFileStatus(srcPath); + long blockSize = srcStat.getBlockSize(); + long srcFileSize = srcStat.getLen(); + String uriPath = srcPath.toUri().getPath(); + + int numBlocksFixed = 0; + List corrupt = + RaidDFSUtil.corruptBlocksInFile(srcFs, uriPath, 0, srcFileSize); + if (corrupt.size() == 0) { + return false; + } + for (LocatedBlock lb: corrupt) { + Block corruptBlock = lb.getBlock(); + long corruptOffset = lb.getStartOffset(); + + LOG.info("Found corrupt block " + corruptBlock + + ", offset " + corruptOffset); + + final long blockContentsSize = + Math.min(blockSize, srcFileSize - corruptOffset); + File localBlockFile = + File.createTempFile(corruptBlock.getBlockName(), ".tmp"); + localBlockFile.deleteOnExit(); + + try { + decoder.recoverBlockToFile(srcFs, srcPath, parityPair.getFileSystem(), + parityPair.getPath(), blockSize, + corruptOffset, localBlockFile, + blockContentsSize); + + // We have a the contents of the block, send them. + DatanodeInfo datanode = chooseDatanode(lb.getLocations()); + computeMetadataAndSendFixedBlock(datanode, localBlockFile, + lb, blockContentsSize); + numBlocksFixed++; + } finally { + localBlockFile.delete(); + } + progress.progress(); } + LOG.info("Fixed " + numBlocksFixed + " blocks in " + srcPath); + return true; } - LOG.info("Fixed " + numBlocksFixed + " blocks in " + parityPath); - numFilesFixed++; - } - /** - * Reads through a parity HAR part file, fixing corrupt blocks on the way. - * A HAR block can contain many file blocks, as long as the HAR part file - * block size is a multiple of the file block size. - */ - void processCorruptParityHarPartFile(Path partFile) throws IOException { - LOG.info("Processing corrupt file " + partFile); - // Get some basic information. - DistributedFileSystem dfs = getDFS(partFile); - FileStatus partFileStat = dfs.getFileStatus(partFile); - long partFileSize = partFileStat.getLen(); - long partFileBlockSize = partFileStat.getBlockSize(); - LOG.info(partFile + " has block size " + partFileBlockSize); - - // Find the path to the index file. - // Parity file HARs are only one level deep, so the index files is at the - // same level as the part file. - String harDirectory = partFile.toUri().getPath(); // Temporarily. - harDirectory = - harDirectory.substring(0, harDirectory.lastIndexOf(Path.SEPARATOR)); - Path indexFile = new Path(harDirectory + "/" + HarIndex.indexFileName); - FileStatus indexStat = dfs.getFileStatus(indexFile); - // Parses through the HAR index file. - HarIndex harIndex = new HarIndex(dfs.open(indexFile), indexStat.getLen()); - - String uriPath = partFile.toUri().getPath(); - int numBlocksFixed = 0; - List corrupt = RaidDFSUtil.corruptBlocksInFile( - dfs, uriPath, 0, partFileSize); - for (LocatedBlock lb: corrupt) { - Block corruptBlock = lb.getBlock(); - long corruptOffset = lb.getStartOffset(); - - File localBlockFile = - File.createTempFile(corruptBlock.getBlockName(), ".tmp"); - localBlockFile.deleteOnExit(); - processCorruptParityHarPartBlock( - dfs, partFile, corruptBlock, corruptOffset, partFileStat, harIndex, - localBlockFile); - // Now we have recovered the part file block locally, send it. - try { - DatanodeInfo datanode = chooseDatanode(lb.getLocations()); - computeMetdataAndSendFixedBlock(datanode, localBlockFile, - lb, localBlockFile.length()); - numBlocksFixed++; + /** + * Fixes corrupt blocks in a parity file. + * This function uses the corresponding source file to regenerate parity + * file blocks. + * @return true if file has been fixed, false if no fixing + * was necessary or possible. + */ + boolean processCorruptParityFile(Path parityPath, Encoder encoder, + Progressable progress) + throws IOException { + LOG.info("Processing corrupt file " + parityPath); + Path srcPath = sourcePathFromParityPath(parityPath); + if (srcPath == null) { + LOG.warn("Unusable parity file " + parityPath); + return false; + } + + DistributedFileSystem parityFs = getDFS(parityPath); + FileStatus parityStat = parityFs.getFileStatus(parityPath); + long blockSize = parityStat.getBlockSize(); + long parityFileSize = parityStat.getLen(); + FileStatus srcStat = getDFS(srcPath).getFileStatus(srcPath); + long srcFileSize = srcStat.getLen(); + + // Check timestamp. + if (srcStat.getModificationTime() != parityStat.getModificationTime()) { + LOG.info("Mismatching timestamp for " + srcPath + " and " + parityPath + + ", moving on..."); + return false; + } + + String uriPath = parityPath.toUri().getPath(); + int numBlocksFixed = 0; + List corrupt = + RaidDFSUtil.corruptBlocksInFile(parityFs, uriPath, 0, parityFileSize); + if (corrupt.size() == 0) { + return false; + } + for (LocatedBlock lb: corrupt) { + Block corruptBlock = lb.getBlock(); + long corruptOffset = lb.getStartOffset(); + + LOG.info("Found corrupt block " + corruptBlock + + ", offset " + corruptOffset); + + File localBlockFile = + File.createTempFile(corruptBlock.getBlockName(), ".tmp"); + localBlockFile.deleteOnExit(); + + try { + encoder.recoverParityBlockToFile(parityFs, srcPath, srcFileSize, + blockSize, parityPath, + corruptOffset, localBlockFile); + // We have a the contents of the block, send them. + DatanodeInfo datanode = chooseDatanode(lb.getLocations()); + computeMetadataAndSendFixedBlock(datanode, localBlockFile, lb, + blockSize); + + numBlocksFixed++; + } finally { + localBlockFile.delete(); + } + progress.progress(); + } + LOG.info("Fixed " + numBlocksFixed + " blocks in " + parityPath); + return true; + } - LOG.info("Adding " + partFile + " to history"); - history.put(partFile.toString(), new java.util.Date()); - } finally { - localBlockFile.delete(); + /** + * Reads through a parity HAR part file, fixing corrupt blocks on the way. + * A HAR block can contain many file blocks, as long as the HAR part file + * block size is a multiple of the file block size. + * @return true if file has been fixed, false if no fixing + * was necessary or possible. + */ + boolean processCorruptParityHarPartFile(Path partFile, + Progressable progress) + throws IOException { + LOG.info("Processing corrupt file " + partFile); + // Get some basic information. + DistributedFileSystem dfs = getDFS(partFile); + FileStatus partFileStat = dfs.getFileStatus(partFile); + long partFileSize = partFileStat.getLen(); + long partFileBlockSize = partFileStat.getBlockSize(); + LOG.info(partFile + " has block size " + partFileBlockSize); + + // Find the path to the index file. + // Parity file HARs are only one level deep, so the index files is at the + // same level as the part file. + String harDirectory = partFile.toUri().getPath(); // Temporarily. + harDirectory = + harDirectory.substring(0, harDirectory.lastIndexOf(Path.SEPARATOR)); + Path indexFile = new Path(harDirectory + "/" + HarIndex.indexFileName); + FileStatus indexStat = dfs.getFileStatus(indexFile); + // Parses through the HAR index file. + HarIndex harIndex = new HarIndex(dfs.open(indexFile), indexStat.getLen()); + + String uriPath = partFile.toUri().getPath(); + int numBlocksFixed = 0; + List corrupt = + RaidDFSUtil.corruptBlocksInFile(dfs, uriPath, 0, partFileSize); + if (corrupt.size() == 0) { + return false; + } + for (LocatedBlock lb: corrupt) { + Block corruptBlock = lb.getBlock(); + long corruptOffset = lb.getStartOffset(); + + File localBlockFile = + File.createTempFile(corruptBlock.getBlockName(), ".tmp"); + localBlockFile.deleteOnExit(); + processCorruptParityHarPartBlock(dfs, partFile, corruptBlock, + corruptOffset, partFileStat, harIndex, + localBlockFile, progress); + // Now we have recovered the part file block locally, send it. + try { + DatanodeInfo datanode = chooseDatanode(lb.getLocations()); + computeMetadataAndSendFixedBlock(datanode, localBlockFile, + lb, localBlockFile.length()); + numBlocksFixed++; + } finally { + localBlockFile.delete(); + } + progress.progress(); } + LOG.info("Fixed " + numBlocksFixed + " blocks in " + partFile); + return true; } - LOG.info("Fixed " + numBlocksFixed + " blocks in " + partFile); - numFilesFixed++; - } - /** - * This fixes a single part file block by recovering in sequence each - * parity block in the part file block. - */ - private void processCorruptParityHarPartBlock( - FileSystem dfs, Path partFile, Block corruptBlock, long corruptOffset, - FileStatus partFileStat, HarIndex harIndex, File localBlockFile) - throws IOException { - String partName = partFile.toUri().getPath(); // Temporarily. - partName = partName.substring(1 + partName.lastIndexOf(Path.SEPARATOR)); + /** + * This fixes a single part file block by recovering in sequence each + * parity block in the part file block. + */ + private void processCorruptParityHarPartBlock(FileSystem dfs, Path partFile, + Block corruptBlock, + long corruptOffset, + FileStatus partFileStat, + HarIndex harIndex, + File localBlockFile, + Progressable progress) + throws IOException { + String partName = partFile.toUri().getPath(); // Temporarily. + partName = partName.substring(1 + partName.lastIndexOf(Path.SEPARATOR)); - OutputStream out = new FileOutputStream(localBlockFile); + OutputStream out = new FileOutputStream(localBlockFile); - try { - // A HAR part file block could map to several parity files. We need to - // use all of them to recover this block. - final long corruptEnd = Math.min(corruptOffset + partFileStat.getBlockSize(), - partFileStat.getLen()); - for (long offset = corruptOffset; offset < corruptEnd; ) { - HarIndex.IndexEntry entry = harIndex.findEntry(partName, offset); - if (entry == null) { - String msg = "Corrupt index file has no matching index entry for " + - partName + ":" + offset; - LOG.warn(msg); - throw new IOException(msg); - } - Path parityFile = new Path(entry.fileName); - Encoder encoder; - if (isXorParityFile(parityFile)) { - encoder = xorEncoder; - } else if (isRsParityFile(parityFile)) { - encoder = rsEncoder; - } else { - String msg = "Could not figure out parity file correctly"; - LOG.warn(msg); - throw new IOException(msg); - } - Path srcFile = sourcePathFromParityPath(parityFile); - FileStatus srcStat = dfs.getFileStatus(srcFile); - if (srcStat.getModificationTime() != entry.mtime) { - String msg = "Modification times of " + parityFile + " and " + srcFile + - " do not match."; - LOG.warn(msg); - throw new IOException(msg); - } - long corruptOffsetInParity = offset - entry.startOffset; - LOG.info(partFile + ":" + offset + " maps to " + - parityFile + ":" + corruptOffsetInParity + - " and will be recovered from " + srcFile); - encoder.recoverParityBlockToStream(dfs, srcFile, srcStat.getLen(), - srcStat.getBlockSize(), parityFile, corruptOffsetInParity, out); - // Finished recovery of one parity block. Since a parity block has the - // same size as a source block, we can move offset by source block size. - offset += srcStat.getBlockSize(); - LOG.info("Recovered " + srcStat.getBlockSize() + " part file bytes "); - if (offset > corruptEnd) { - String msg = - "Recovered block spills across part file blocks. Cannot continue..."; - throw new IOException(msg); + try { + // A HAR part file block could map to several parity files. We need to + // use all of them to recover this block. + final long corruptEnd = Math.min(corruptOffset + + partFileStat.getBlockSize(), + partFileStat.getLen()); + for (long offset = corruptOffset; offset < corruptEnd; ) { + HarIndex.IndexEntry entry = harIndex.findEntry(partName, offset); + if (entry == null) { + String msg = "Corrupt index file has no matching index entry for " + + partName + ":" + offset; + LOG.warn(msg); + throw new IOException(msg); + } + Path parityFile = new Path(entry.fileName); + Encoder encoder; + if (isXorParityFile(parityFile)) { + encoder = xorEncoder; + } else if (isRsParityFile(parityFile)) { + encoder = rsEncoder; + } else { + String msg = "Could not figure out parity file correctly"; + LOG.warn(msg); + throw new IOException(msg); + } + Path srcFile = sourcePathFromParityPath(parityFile); + FileStatus srcStat = dfs.getFileStatus(srcFile); + if (srcStat.getModificationTime() != entry.mtime) { + String msg = "Modification times of " + parityFile + " and " + + srcFile + " do not match."; + LOG.warn(msg); + throw new IOException(msg); + } + long corruptOffsetInParity = offset - entry.startOffset; + LOG.info(partFile + ":" + offset + " maps to " + + parityFile + ":" + corruptOffsetInParity + + " and will be recovered from " + srcFile); + encoder.recoverParityBlockToStream(dfs, srcFile, srcStat.getLen(), + srcStat.getBlockSize(), parityFile, + corruptOffsetInParity, out); + // Finished recovery of one parity block. Since a parity block has the + // same size as a source block, we can move offset by source block size. + offset += srcStat.getBlockSize(); + LOG.info("Recovered " + srcStat.getBlockSize() + " part file bytes "); + if (offset > corruptEnd) { + String msg = + "Recovered block spills across part file blocks. Cannot continue."; + throw new IOException(msg); + } + progress.progress(); } + } finally { + out.close(); } - } finally { - out.close(); } - } - /** - * Choose a datanode (hostname:portnumber). The datanode is chosen at - * random from the live datanodes. - * @param locationsToAvoid locations to avoid. - * @return A string in the format name:port. - * @throws IOException - */ - private DatanodeInfo chooseDatanode(DatanodeInfo[] locationsToAvoid) - throws IOException { - DistributedFileSystem dfs = getDFS(new Path("/")); - DatanodeInfo[] live = dfs.getClient().datanodeReport( - DatanodeReportType.LIVE); - LOG.info("Choosing a datanode from " + live.length + - " live nodes while avoiding " + locationsToAvoid.length); - Random rand = new Random(); - DatanodeInfo chosen = null; - int maxAttempts = 1000; - for (int i = 0; i < maxAttempts && chosen == null; i++) { - int idx = rand.nextInt(live.length); - chosen = live[idx]; - for (DatanodeInfo avoid: locationsToAvoid) { - if (chosen.name.equals(avoid.name)) { - LOG.info("Avoiding " + avoid.name); - chosen = null; - break; + /** + * Choose a datanode (hostname:portnumber). The datanode is chosen at + * random from the live datanodes. + * @param locationsToAvoid locations to avoid. + * @return A datanode + * @throws IOException + */ + private DatanodeInfo chooseDatanode(DatanodeInfo[] locationsToAvoid) + throws IOException { + DistributedFileSystem dfs = getDFS(new Path("/")); + DatanodeInfo[] live = + dfs.getClient().datanodeReport(DatanodeReportType.LIVE); + LOG.info("Choosing a datanode from " + live.length + + " live nodes while avoiding " + locationsToAvoid.length); + Random rand = new Random(); + DatanodeInfo chosen = null; + int maxAttempts = 1000; + for (int i = 0; i < maxAttempts && chosen == null; i++) { + int idx = rand.nextInt(live.length); + chosen = live[idx]; + for (DatanodeInfo avoid: locationsToAvoid) { + if (chosen.name.equals(avoid.name)) { + LOG.info("Avoiding " + avoid.name); + chosen = null; + break; + } } } + if (chosen == null) { + throw new IOException("Could not choose datanode"); + } + LOG.info("Choosing datanode " + chosen.name); + return chosen; } - if (chosen == null) { - throw new IOException("Could not choose datanode"); - } - LOG.info("Choosing datanode " + chosen.name); - return chosen; - } - /** - * Reads data from the data stream provided and computes metadata. - */ - static DataInputStream computeMetadata( - Configuration conf, InputStream dataStream) throws IOException { - ByteArrayOutputStream mdOutBase = new ByteArrayOutputStream(1024*1024); - DataOutputStream mdOut = new DataOutputStream(mdOutBase); - - // First, write out the version. - mdOut.writeShort(FSDataset.METADATA_VERSION); - - // Create a summer and write out its header. - int bytesPerChecksum = conf.getInt("io.bytes.per.checksum", 512); - DataChecksum sum = DataChecksum.newDataChecksum( - DataChecksum.CHECKSUM_CRC32, - bytesPerChecksum); - sum.writeHeader(mdOut); - - // Buffer to read in a chunk of data. - byte[] buf = new byte[bytesPerChecksum]; - // Buffer to store the checksum bytes. - byte[] chk = new byte[sum.getChecksumSize()]; - - // Read data till we reach the end of the input stream. - int bytesSinceFlush = 0; - while (true) { - // Read some bytes. - int bytesRead = dataStream.read( - buf, bytesSinceFlush, bytesPerChecksum-bytesSinceFlush); - if (bytesRead == -1) { - if (bytesSinceFlush > 0) { + /** + * Reads data from the data stream provided and computes metadata. + */ + static DataInputStream computeMetadata(Configuration conf, + InputStream dataStream) + throws IOException { + ByteArrayOutputStream mdOutBase = new ByteArrayOutputStream(1024*1024); + DataOutputStream mdOut = new DataOutputStream(mdOutBase); + + // First, write out the version. + mdOut.writeShort(FSDataset.METADATA_VERSION); + + // Create a summer and write out its header. + int bytesPerChecksum = conf.getInt("io.bytes.per.checksum", 512); + DataChecksum sum = + DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, + bytesPerChecksum); + sum.writeHeader(mdOut); + + // Buffer to read in a chunk of data. + byte[] buf = new byte[bytesPerChecksum]; + // Buffer to store the checksum bytes. + byte[] chk = new byte[sum.getChecksumSize()]; + + // Read data till we reach the end of the input stream. + int bytesSinceFlush = 0; + while (true) { + // Read some bytes. + int bytesRead = dataStream.read(buf, bytesSinceFlush, + bytesPerChecksum-bytesSinceFlush); + if (bytesRead == -1) { + if (bytesSinceFlush > 0) { + boolean reset = true; + sum.writeValue(chk, 0, reset); // This also resets the sum. + // Write the checksum to the stream. + mdOut.write(chk, 0, chk.length); + bytesSinceFlush = 0; + } + break; + } + // Update the checksum. + sum.update(buf, bytesSinceFlush, bytesRead); + bytesSinceFlush += bytesRead; + + // Flush the checksum if necessary. + if (bytesSinceFlush == bytesPerChecksum) { boolean reset = true; sum.writeValue(chk, 0, reset); // This also resets the sum. // Write the checksum to the stream. mdOut.write(chk, 0, chk.length); bytesSinceFlush = 0; } - break; - } - // Update the checksum. - sum.update(buf, bytesSinceFlush, bytesRead); - bytesSinceFlush += bytesRead; - - // Flush the checksum if necessary. - if (bytesSinceFlush == bytesPerChecksum) { - boolean reset = true; - sum.writeValue(chk, 0, reset); // This also resets the sum. - // Write the checksum to the stream. - mdOut.write(chk, 0, chk.length); - bytesSinceFlush = 0; } + + byte[] mdBytes = mdOutBase.toByteArray(); + return new DataInputStream(new ByteArrayInputStream(mdBytes)); } - byte[] mdBytes = mdOutBase.toByteArray(); - return new DataInputStream(new ByteArrayInputStream(mdBytes)); - } + private void computeMetadataAndSendFixedBlock(DatanodeInfo datanode, + File localBlockFile, + LocatedBlock block, + long blockSize) + throws IOException { - private void computeMetdataAndSendFixedBlock( - DatanodeInfo datanode, - File localBlockFile, LocatedBlock block, long blockSize - ) throws IOException { - - LOG.info("Computing metdata"); - InputStream blockContents = null; - DataInputStream blockMetadata = null; - try { - blockContents = new FileInputStream(localBlockFile); - blockMetadata = computeMetadata(getConf(), blockContents); - blockContents.close(); - // Reopen - blockContents = new FileInputStream(localBlockFile); - sendFixedBlock(datanode, blockContents, blockMetadata, block, blockSize); - } finally { - if (blockContents != null) { + LOG.info("Computing metdata"); + InputStream blockContents = null; + DataInputStream blockMetadata = null; + try { + blockContents = new FileInputStream(localBlockFile); + blockMetadata = computeMetadata(getConf(), blockContents); blockContents.close(); - blockContents = null; - } - if (blockMetadata != null) { - blockMetadata.close(); - blockMetadata = null; + // Reopen + blockContents = new FileInputStream(localBlockFile); + sendFixedBlock(datanode, blockContents, blockMetadata, block, + blockSize); + } finally { + if (blockContents != null) { + blockContents.close(); + blockContents = null; + } + if (blockMetadata != null) { + blockMetadata.close(); + blockMetadata = null; + } } } - } - /** - * Send a generated block to a datanode. - * @param datanode Chosen datanode name in host:port form. - * @param blockContents Stream with the block contents. - * @param corruptBlock Block identifying the block to be sent. - * @param blockSize size of the block. - * @throws IOException - */ - private void sendFixedBlock( - DatanodeInfo datanode, - final InputStream blockContents, DataInputStream metadataIn, - LocatedBlock block, long blockSize - ) throws IOException { - InetSocketAddress target = NetUtils.createSocketAddr(datanode.name); - Socket sock = SocketChannel.open().socket(); - - int readTimeout = getConf().getInt(BLOCKFIX_READ_TIMEOUT, - HdfsConstants.READ_TIMEOUT); - NetUtils.connect(sock, target, readTimeout); - sock.setSoTimeout(readTimeout); - - int writeTimeout = getConf().getInt(BLOCKFIX_WRITE_TIMEOUT, - HdfsConstants.WRITE_TIMEOUT); - - OutputStream baseStream = NetUtils.getOutputStream(sock, writeTimeout); - DataOutputStream out = new DataOutputStream( - new BufferedOutputStream(baseStream, FSConstants.SMALL_BUFFER_SIZE)); - - boolean corruptChecksumOk = false; - boolean chunkOffsetOK = false; - boolean verifyChecksum = true; - boolean transferToAllowed = false; - - try { - LOG.info("Sending block " + block.getBlock() + - " from " + sock.getLocalSocketAddress().toString() + - " to " + sock.getRemoteSocketAddress().toString() + - " " + blockSize + " bytes"); - RaidBlockSender blockSender = new RaidBlockSender( - block.getBlock(), blockSize, 0, blockSize, - corruptChecksumOk, chunkOffsetOK, verifyChecksum, transferToAllowed, - metadataIn, new RaidBlockSender.InputStreamFactory() { - @Override - public InputStream createStream(long offset) throws IOException { - // we are passing 0 as the offset above, so we can safely ignore - // the offset passed - return blockContents; - } - }); - - DatanodeInfo[] nodes = new DatanodeInfo[]{datanode}; - DataTransferProtocol.Sender.opWriteBlock( - out, block.getBlock(), 1, - DataTransferProtocol.BlockConstructionStage.PIPELINE_SETUP_CREATE, - 0, blockSize, 0, "", null, nodes, block.getBlockToken()); - blockSender.sendBlock(out, baseStream); - - LOG.info("Sent block " + block.getBlock() + " to " + datanode.name); - } finally { - out.close(); + /** + * Send a generated block to a datanode. + * @param datanode Chosen datanode name in host:port form. + * @param blockContents Stream with the block contents. + * @param corruptBlock Block identifying the block to be sent. + * @param blockSize size of the block. + * @throws IOException + */ + private void sendFixedBlock(DatanodeInfo datanode, + final InputStream blockContents, + DataInputStream metadataIn, + LocatedBlock block, long blockSize) + throws IOException { + InetSocketAddress target = NetUtils.createSocketAddr(datanode.name); + Socket sock = SocketChannel.open().socket(); + + int readTimeout = + getConf().getInt(BLOCKFIX_READ_TIMEOUT, + HdfsConstants.READ_TIMEOUT); + NetUtils.connect(sock, target, readTimeout); + sock.setSoTimeout(readTimeout); + + int writeTimeout = getConf().getInt(BLOCKFIX_WRITE_TIMEOUT, + HdfsConstants.WRITE_TIMEOUT); + + OutputStream baseStream = NetUtils.getOutputStream(sock, writeTimeout); + DataOutputStream out = + new DataOutputStream(new BufferedOutputStream(baseStream, + FSConstants. + SMALL_BUFFER_SIZE)); + + boolean corruptChecksumOk = false; + boolean chunkOffsetOK = false; + boolean verifyChecksum = true; + boolean transferToAllowed = false; + + try { + LOG.info("Sending block " + block.getBlock() + + " from " + sock.getLocalSocketAddress().toString() + + " to " + sock.getRemoteSocketAddress().toString() + + " " + blockSize + " bytes"); + RaidBlockSender blockSender = + new RaidBlockSender(block.getBlock(), blockSize, 0, blockSize, + corruptChecksumOk, chunkOffsetOK, verifyChecksum, + transferToAllowed, metadataIn, + new RaidBlockSender.InputStreamFactory() { + @Override + public InputStream + createStream(long offset) throws IOException { + // we are passing 0 as the offset above, + // so we can safely ignore + // the offset passed + return blockContents; + } + }); + + DatanodeInfo[] nodes = new DatanodeInfo[]{datanode}; + DataTransferProtocol.Sender.opWriteBlock(out, block.getBlock(), 1, + DataTransferProtocol. + BlockConstructionStage. + PIPELINE_SETUP_CREATE, + 0, blockSize, 0, "", null, + nodes, block.getBlockToken()); + blockSender.sendBlock(out, baseStream); + + LOG.info("Sent block " + block.getBlock() + " to " + datanode.name); + } finally { + out.close(); + } } - } - /** - * returns the source file corresponding to a parity file - */ - Path sourcePathFromParityPath(Path parityPath) { - String parityPathStr = parityPath.toUri().getPath(); - if (parityPathStr.startsWith(xorPrefix)) { - // Remove the prefix to get the source file. - String src = parityPathStr.replaceFirst(xorPrefix, "/"); - return new Path(src); - } else if (parityPathStr.startsWith(rsPrefix)) { - // Remove the prefix to get the source file. - String src = parityPathStr.replaceFirst(rsPrefix, "/"); - return new Path(src); + /** + * returns the source file corresponding to a parity file + */ + Path sourcePathFromParityPath(Path parityPath) { + String parityPathStr = parityPath.toUri().getPath(); + if (parityPathStr.startsWith(xorPrefix)) { + // Remove the prefix to get the source file. + String src = parityPathStr.replaceFirst(xorPrefix, "/"); + return new Path(src); + } else if (parityPathStr.startsWith(rsPrefix)) { + // Remove the prefix to get the source file. + String src = parityPathStr.replaceFirst(rsPrefix, "/"); + return new Path(src); + } + return null; + } + + /** + * Returns the corrupt blocks in a file. + */ + List corruptBlocksInFile(DistributedFileSystem fs, + String uriPath, FileStatus stat) + throws IOException { + List corrupt = new LinkedList(); + LocatedBlocks locatedBlocks = + RaidDFSUtil.getBlockLocations(fs, uriPath, 0, stat.getLen()); + for (LocatedBlock b: locatedBlocks.getLocatedBlocks()) { + if (b.isCorrupt() || + (b.getLocations().length == 0 && b.getBlockSize() > 0)) { + corrupt.add(b); + } + } + return corrupt; } - return null; } + } Added: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistBlockFixer.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistBlockFixer.java?rev=1040417&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistBlockFixer.java (added) +++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistBlockFixer.java Tue Nov 30 06:23:55 2010 @@ -0,0 +1,671 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.raid; + +import java.io.IOException; +import java.io.PrintStream; +import java.io.InputStreamReader; +import java.io.BufferedReader; +import java.util.List; +import java.util.LinkedList; +import java.util.Map; +import java.util.HashMap; +import java.util.Set; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Date; +import java.text.SimpleDateFormat; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; + +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.RaidDFSUtil; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; + +import org.apache.hadoop.util.StringUtils; + +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.InputSplit; + +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; + +/** + * distributed block fixer, uses map reduce jobs to fix corrupt files + * + * configuration options + * raid.blockfix.filespertask - number of corrupt files to fix in a single + * map reduce task (i.e., at one mapper node) + * + * raid.blockfix.fairscheduler.pool - the pool to use for block fixer jobs + * + * raid.blockfix.maxpendingfiles - maximum number of files to fix + * simultaneously + */ +public class DistBlockFixer extends BlockFixer { + // volatile should be sufficient since only the block fixer thread + // updates numJobsRunning (other threads may read) + private volatile int numJobsRunning = 0; + + private static final String WORK_DIR_PREFIX = "blockfixer"; + private static final String IN_FILE_SUFFIX = ".in"; + private static final String PART_PREFIX = "part-"; + + private static final String BLOCKFIX_FILES_PER_TASK = + "raid.blockfix.filespertask"; + private static final String BLOCKFIX_MAX_PENDING_FILES = + "raid.blockfix.maxpendingfiles"; + private static final String BLOCKFIX_POOL = + "raid.blockfix.fairscheduler.pool"; + // mapred.fairscheduler.pool is only used in the local configuration + // passed to a block fixing job + private static final String MAPRED_POOL = + "mapred.fairscheduler.pool"; + + // default number of files to fix in a task + private static final long DEFAULT_BLOCKFIX_FILES_PER_TASK = 10L; + + // default number of files to fix simultaneously + private static final long DEFAULT_BLOCKFIX_MAX_PENDING_FILES = 1000L; + + protected static final Log LOG = LogFactory.getLog(DistBlockFixer.class); + + // number of files to fix in a task + private long filesPerTask; + + // number of files to fix simultaneously + final private long maxPendingFiles; + + // number of files being fixed right now + private long pendingFiles; + + // pool name to use (may be null, in which case no special pool is used) + private String poolName; + + private long lastCheckTime; + + private final SimpleDateFormat dateFormat = + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + private Map fileIndex = + new HashMap(); + private Map> jobIndex = + new HashMap>(); + + static enum Counter { + FILES_SUCCEEDED, FILES_FAILED, FILES_NOACTION + } + + public DistBlockFixer(Configuration conf) { + super(conf); + filesPerTask = DistBlockFixer.filesPerTask(getConf()); + maxPendingFiles = DistBlockFixer.maxPendingFiles(getConf()); + pendingFiles = 0L; + poolName = conf.get(BLOCKFIX_POOL); + + // start off due for the first iteration + lastCheckTime = System.currentTimeMillis() - blockFixInterval; + } + + /** + * determines how many files to fix in a single task + */ + protected static long filesPerTask(Configuration conf) { + return conf.getLong(BLOCKFIX_FILES_PER_TASK, + DEFAULT_BLOCKFIX_FILES_PER_TASK); + + } + /** + * determines how many files to fix simultaneously + */ + protected static long maxPendingFiles(Configuration conf) { + return conf.getLong(BLOCKFIX_MAX_PENDING_FILES, + DEFAULT_BLOCKFIX_MAX_PENDING_FILES); + } + + /** + * runs the block fixer periodically + */ + public void run() { + while (running) { + // check if it is time to run the block fixer + long now = System.currentTimeMillis(); + if (now >= lastCheckTime + blockFixInterval) { + lastCheckTime = now; + try { + checkAndFixBlocks(now); + } catch (InterruptedException ignore) { + LOG.info("interrupted"); + } catch (Exception e) { + // log exceptions and keep running + LOG.error(StringUtils.stringifyException(e)); + } catch (Error e) { + LOG.error(StringUtils.stringifyException(e)); + throw e; + } + } + + // try to sleep for the remainder of the interval + long sleepPeriod = (lastCheckTime - System.currentTimeMillis()) + + blockFixInterval; + + if ((sleepPeriod > 0L) && running) { + try { + Thread.sleep(sleepPeriod); + } catch (InterruptedException ignore) { + LOG.info("interrupted"); + } + } + } + } + + /** + * checks for corrupt blocks and fixes them (if any) + */ + private void checkAndFixBlocks(long startTime) + throws IOException, InterruptedException, ClassNotFoundException { + checkJobs(); + + if (pendingFiles >= maxPendingFiles) { + return; + } + + List corruptFiles = getCorruptFiles(); + filterUnfixableSourceFiles(corruptFiles.iterator()); + + String startTimeStr = dateFormat.format(new Date(startTime)); + + LOG.info("found " + corruptFiles.size() + " corrupt files"); + + if (corruptFiles.size() > 0) { + String jobName = "blockfixer." + startTime; + startJob(jobName, corruptFiles); + } + } + + /** + * Handle a failed job. + */ + private void failJob(Job job) throws IOException { + // assume no files have been fixed + LOG.info("job " + job.getJobID() + "(" + job.getJobName() + + ") finished (failed)"); + for (CorruptFileInfo fileInfo: jobIndex.get(job)) { + fileInfo.fail(); + } + numJobsRunning--; + } + + /** + * Handle a successful job. + */ + private void succeedJob(Job job, long filesSucceeded, long filesFailed) + throws IOException { + LOG.info("job " + job.getJobID() + "(" + job.getJobName() + + ") finished (succeeded)"); + + if (filesFailed == 0) { + // no files have failed + for (CorruptFileInfo fileInfo: jobIndex.get(job)) { + fileInfo.succeed(); + } + } else { + // we have to look at the output to check which files have failed + Set failedFiles = getFailedFiles(job); + + for (CorruptFileInfo fileInfo: jobIndex.get(job)) { + if (failedFiles.contains(fileInfo.getFile().toString())) { + fileInfo.fail(); + } else { + // call succeed for files that have succeeded or for which no action + // was taken + fileInfo.succeed(); + } + } + } + // report succeeded files to metrics + incrFilesFixed(filesSucceeded); + numJobsRunning--; + } + + /** + * checks if jobs have completed and updates job and file index + * returns a list of failed files for restarting + */ + private void checkJobs() throws IOException { + Iterator jobIter = jobIndex.keySet().iterator(); + while(jobIter.hasNext()) { + Job job = jobIter.next(); + + try { + if (job.isComplete()) { + long filesSucceeded = + job.getCounters().findCounter(Counter.FILES_SUCCEEDED).getValue(); + long filesFailed = + job.getCounters().findCounter(Counter.FILES_FAILED).getValue(); + long filesNoAction = + job.getCounters().findCounter(Counter.FILES_NOACTION).getValue(); + int files = jobIndex.get(job).size(); + if (job.isSuccessful() && + (filesSucceeded + filesFailed + filesNoAction == + ((long) files))) { + // job has processed all files + succeedJob(job, filesSucceeded, filesFailed); + } else { + failJob(job); + } + jobIter.remove(); + } else { + LOG.info("job " + job.getJobName() + " still running"); + } + } catch (Exception e) { + LOG.error(StringUtils.stringifyException(e)); + failJob(job); + try { + job.killJob(); + } catch (Exception ee) { + LOG.error(StringUtils.stringifyException(ee)); + } + jobIter.remove(); + } + } + purgeFileIndex(); + } + + /** + * determines which files have failed for a given job + */ + private Set getFailedFiles(Job job) throws IOException { + Set failedFiles = new HashSet(); + + Path outDir = SequenceFileOutputFormat.getOutputPath(job); + FileSystem fs = outDir.getFileSystem(getConf()); + if (!fs.getFileStatus(outDir).isDir()) { + throw new IOException(outDir.toString() + " is not a directory"); + } + + FileStatus[] files = fs.listStatus(outDir); + + for (FileStatus f: files) { + Path fPath = f.getPath(); + if ((!f.isDir()) && (fPath.getName().startsWith(PART_PREFIX))) { + LOG.info("opening " + fPath.toString()); + SequenceFile.Reader reader = + new SequenceFile.Reader(fs, fPath, getConf()); + + Text key = new Text(); + Text value = new Text(); + while (reader.next(key, value)) { + failedFiles.add(key.toString()); + } + reader.close(); + } + } + return failedFiles; + } + + + /** + * purge expired jobs from the file index + */ + private void purgeFileIndex() { + Iterator fileIter = fileIndex.keySet().iterator(); + while(fileIter.hasNext()) { + String file = fileIter.next(); + if (fileIndex.get(file).isExpired()) { + fileIter.remove(); + } + } + + } + + /** + * creates and submits a job, updates file index and job index + */ + private Job startJob(String jobName, List corruptFiles) + throws IOException, InterruptedException, ClassNotFoundException { + Path inDir = new Path(WORK_DIR_PREFIX + "/in/" + jobName); + Path outDir = new Path(WORK_DIR_PREFIX + "/out/" + jobName); + List filesInJob = createInputFile(jobName, inDir, corruptFiles); + + Configuration jobConf = new Configuration(getConf()); + if (poolName != null) { + jobConf.set(MAPRED_POOL, poolName); + } + Job job = new Job(jobConf, jobName); + job.setJarByClass(getClass()); + job.setMapperClass(DistBlockFixerMapper.class); + job.setNumReduceTasks(0); + job.setInputFormatClass(DistBlockFixerInputFormat.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + + DistBlockFixerInputFormat.setInputPaths(job, inDir); + SequenceFileOutputFormat.setOutputPath(job, outDir); + + job.submit(); + // submit the job before inserting it into the index + // this way, if submit fails, we won't have added anything to the index + insertJob(job, filesInJob); + return job; + } + + /** + * inserts new job into file index and job index + */ + private void insertJob(Job job, List corruptFiles) { + List fileInfos = new LinkedList(); + + for (Path file: corruptFiles) { + CorruptFileInfo fileInfo = new CorruptFileInfo(file, job); + fileInfos.add(fileInfo); + fileIndex.put(file.toString(), fileInfo); + } + + jobIndex.put(job, fileInfos); + numJobsRunning++; + } + + /** + * creates the input file (containing the names of the files to be fixed + */ + private List createInputFile(String jobName, Path inDir, + List corruptFiles) + throws IOException { + + Path file = new Path(inDir, jobName + IN_FILE_SUFFIX); + FileSystem fs = file.getFileSystem(getConf()); + SequenceFile.Writer fileOut = SequenceFile.createWriter(fs, getConf(), file, + LongWritable.class, + Text.class); + long index = 0L; + + List filesAdded = new LinkedList(); + + for (Path corruptFile: corruptFiles) { + if (pendingFiles >= maxPendingFiles) { + break; + } + + String corruptFileName = corruptFile.toString(); + fileOut.append(new LongWritable(index++), new Text(corruptFileName)); + filesAdded.add(corruptFile); + pendingFiles++; + + if (index % filesPerTask == 0) { + fileOut.sync(); // create sync point to make sure we can split here + } + } + + fileOut.close(); + return filesAdded; + } + + /** + * gets a list of corrupt files from the name node + * and filters out files that are currently being fixed or + * that were recently fixed + */ + private List getCorruptFiles() throws IOException { + DistributedFileSystem dfs = (DistributedFileSystem) + (new Path("/")).getFileSystem(getConf()); + + String[] files = RaidDFSUtil.getCorruptFiles(dfs); + List corruptFiles = new LinkedList(); + + for (String f: files) { + Path p = new Path(f); + // filter out files that are being fixed or that were recently fixed + if (!fileIndex.containsKey(p.toString())) { + corruptFiles.add(p); + } + } + RaidUtils.filterTrash(getConf(), corruptFiles); + + return corruptFiles; + } + + /** + * returns the number of map reduce jobs running + */ + public int jobsRunning() { + return numJobsRunning; + } + + /** + * hold information about a corrupt file that is being fixed + */ + class CorruptFileInfo { + + private Path file; + private Job job; + private boolean done; + private long time; + + public CorruptFileInfo(Path file, Job job) { + this.file = file; + this.job = job; + this.done = false; + this.time = 0; + } + + public boolean isDone() { + return done; + } + + public boolean isExpired() { + return done && ((System.currentTimeMillis() - time) > historyInterval); + } + + public Path getFile() { + return file; + } + + /** + * updates file index to record a failed attempt at fixing a file, + * immediately removes the entry from the file index + * (instead of letting it expire) + * so that we can retry right away + */ + public void fail() { + // remove this file from the index + CorruptFileInfo removed = fileIndex.remove(file.toString()); + if (removed == null) { + LOG.error("trying to remove file not in file index: " + + file.toString()); + } else { + LOG.info("fixing " + file.toString() + " failed"); + } + pendingFiles--; + } + + /** + * marks a file as fixed successfully + * and sets time stamp for expiry after specified interval + */ + public void succeed() { + // leave the file in the index, + // will be pruged later + job = null; + done = true; + time = System.currentTimeMillis(); + LOG.info("fixing " + file.toString() + " succeeded"); + pendingFiles--; + } + } + + static class DistBlockFixerInputFormat + extends SequenceFileInputFormat { + + protected static final Log LOG = + LogFactory.getLog(DistBlockFixerMapper.class); + + /** + * splits the input files into tasks handled by a single node + * we have to read the input files to do this based on a number of + * items in a sequence + */ + @Override + public List getSplits(JobContext job) + throws IOException { + long filesPerTask = DistBlockFixer.filesPerTask(job.getConfiguration()); + + Path[] inPaths = getInputPaths(job); + + List splits = new LinkedList(); + + long fileCounter = 0; + + for (Path inPath: inPaths) { + + FileSystem fs = inPath.getFileSystem(job.getConfiguration()); + + if (!fs.getFileStatus(inPath).isDir()) { + throw new IOException(inPath.toString() + " is not a directory"); + } + + FileStatus[] inFiles = fs.listStatus(inPath); + + for (FileStatus inFileStatus: inFiles) { + Path inFile = inFileStatus.getPath(); + + if (!inFileStatus.isDir() && + (inFile.getName().equals(job.getJobName() + IN_FILE_SUFFIX))) { + + fileCounter++; + SequenceFile.Reader inFileReader = + new SequenceFile.Reader(fs, inFile, job.getConfiguration()); + + long startPos = inFileReader.getPosition(); + long counter = 0; + + // create an input split every filesPerTask items in the sequence + LongWritable key = new LongWritable(); + Text value = new Text(); + try { + while (inFileReader.next(key, value)) { + if (counter % filesPerTask == filesPerTask - 1L) { + splits.add(new FileSplit(inFile, startPos, + inFileReader.getPosition() - + startPos, + null)); + startPos = inFileReader.getPosition(); + } + counter++; + } + + // create input split for remaining items if necessary + // this includes the case where no splits were created by the loop + if (startPos != inFileReader.getPosition()) { + splits.add(new FileSplit(inFile, startPos, + inFileReader.getPosition() - startPos, + null)); + } + } finally { + inFileReader.close(); + } + } + } + } + + LOG.info("created " + splits.size() + " input splits from " + + fileCounter + " files"); + + return splits; + } + + /** + * indicates that input file can be split + */ + @Override + public boolean isSplitable (JobContext job, Path file) { + return true; + } + } + + + /** + * mapper for fixing stripes with corrupt blocks + */ + static class DistBlockFixerMapper + extends Mapper { + + protected static final Log LOG = + LogFactory.getLog(DistBlockFixerMapper.class); + + /** + * fix a stripe + */ + @Override + public void map(LongWritable key, Text fileText, Context context) + throws IOException, InterruptedException { + + BlockFixerHelper helper = + new BlockFixerHelper(context.getConfiguration()); + + String fileStr = fileText.toString(); + LOG.info("fixing " + fileStr); + + Path file = new Path(fileStr); + boolean success = false; + + try { + boolean fixed = helper.fixFile(file, context); + + if (fixed) { + context.getCounter(Counter.FILES_SUCCEEDED).increment(1L); + } else { + context.getCounter(Counter.FILES_NOACTION).increment(1L); + } + } catch (Exception e) { + LOG.error(StringUtils.stringifyException(e)); + + // report file as failed + context.getCounter(Counter.FILES_FAILED).increment(1L); + String outkey = fileStr; + String outval = "failed"; + context.write(new Text(outkey), new Text(outval)); + } + + context.progress(); + } + } + +} Added: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/LocalBlockFixer.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/LocalBlockFixer.java?rev=1040417&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/LocalBlockFixer.java (added) +++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/LocalBlockFixer.java Tue Nov 30 06:23:55 2010 @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.raid; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.RaidDFSUtil; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; + +import org.apache.hadoop.util.StringUtils; + +import org.apache.hadoop.net.NetUtils; + +import org.apache.hadoop.hdfs.DistributedFileSystem; + +import org.apache.hadoop.raid.RaidNode; +import org.apache.hadoop.raid.RaidUtils; +import org.apache.hadoop.raid.protocol.PolicyInfo.ErasureCodeType; + +/** + * This class fixes source file blocks using the parity file, + * and parity file blocks using the source file. + * It periodically fetches the list of corrupt files from the namenode, + * and figures out the location of the bad block by reading through + * the corrupt file. + */ +public class LocalBlockFixer extends BlockFixer { + public static final Log LOG = LogFactory.getLog(LocalBlockFixer.class); + + private java.util.HashMap history; + + private BlockFixerHelper helper; + + public LocalBlockFixer(Configuration conf) throws IOException { + super(conf); + history = new java.util.HashMap(); + helper = new BlockFixerHelper(getConf()); + } + + public void run() { + while (running) { + try { + LOG.info("LocalBlockFixer continuing to run..."); + doFix(); + } catch (Exception e) { + LOG.error(StringUtils.stringifyException(e)); + } catch (Error err) { + LOG.error("Exiting after encountering " + + StringUtils.stringifyException(err)); + throw err; + } + } + } + + void doFix() throws InterruptedException, IOException { + while (running) { + // Sleep before proceeding to fix files. + Thread.sleep(blockFixInterval); + + // Purge history older than the history interval. + purgeHistory(); + + List corruptFiles = getCorruptFiles(); + + filterUnfixableSourceFiles(corruptFiles.iterator()); + + if (corruptFiles.isEmpty()) { + // If there are no corrupt files, retry after some time. + continue; + } + LOG.info("Found " + corruptFiles.size() + " corrupt files."); + + helper.sortCorruptFiles(corruptFiles); + + for (Path srcPath: corruptFiles) { + if (!running) break; + try { + boolean fixed = helper.fixFile(srcPath); + LOG.info("Adding " + srcPath + " to history"); + history.put(srcPath.toString(), new java.util.Date()); + if (fixed) { + incrFilesFixed(); + } + } catch (IOException ie) { + LOG.error("Hit error while processing " + srcPath + + ": " + StringUtils.stringifyException(ie)); + // Do nothing, move on to the next file. + } + } + } + } + + + /** + * We maintain history of fixed files because a fixed file may appear in + * the list of corrupt files if we loop around too quickly. + * This function removes the old items in the history so that we can + * recognize files that have actually become corrupt since being fixed. + */ + void purgeHistory() { + java.util.Date cutOff = new java.util.Date(System.currentTimeMillis() - + historyInterval); + List toRemove = new java.util.ArrayList(); + + for (String key: history.keySet()) { + java.util.Date item = history.get(key); + if (item.before(cutOff)) { + toRemove.add(key); + } + } + for (String key: toRemove) { + LOG.info("Removing " + key + " from history"); + history.remove(key); + } + } + + /** + * @return A list of corrupt files as obtained from the namenode + */ + List getCorruptFiles() throws IOException { + DistributedFileSystem dfs = helper.getDFS(new Path("/")); + + String[] files = RaidDFSUtil.getCorruptFiles(dfs); + List corruptFiles = new LinkedList(); + for (String f: files) { + Path p = new Path(f); + if (!history.containsKey(p.toString())) { + corruptFiles.add(p); + } + } + RaidUtils.filterTrash(getConf(), corruptFiles); + return corruptFiles; + } + +} + Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java?rev=1040417&r1=1040416&r2=1040417&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java (original) +++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java Tue Nov 30 06:23:55 2010 @@ -289,7 +289,7 @@ public abstract class RaidNode implement running = true; this.server.start(); // start RPC server - this.blockFixer = new BlockFixer(conf); + this.blockFixer = BlockFixer.createBlockFixer(conf); this.blockFixerThread = new Daemon(this.blockFixer); this.blockFixerThread.start(); Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java?rev=1040417&r1=1040416&r2=1040417&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java (original) +++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java Tue Nov 30 06:23:55 2010 @@ -288,7 +288,7 @@ public class RaidShell extends Configure public void recoverBlocks(String[] args, int startIndex) throws IOException { LOG.info("Recovering blocks for " + (args.length - startIndex) + " files"); - BlockFixer fixer = new BlockFixer(conf); + BlockFixer.BlockFixerHelper fixer = new BlockFixer.BlockFixerHelper(conf); for (int i = startIndex; i < args.length; i++) { String path = args[i]; fixer.fixFile(new Path(path));