Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 10900 invoked from network); 12 Oct 2010 18:24:19 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 12 Oct 2010 18:24:19 -0000 Received: (qmail 13060 invoked by uid 500); 12 Oct 2010 18:24:18 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 13011 invoked by uid 500); 12 Oct 2010 18:24:18 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 12947 invoked by uid 99); 12 Oct 2010 18:24:18 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 Oct 2010 18:24:18 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED,T_FRT_POSSIBLE X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 Oct 2010 18:24:09 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id E423E23889E9; Tue, 12 Oct 2010 18:23:37 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1021873 [2/3] - in /hadoop/mapreduce/trunk: ./ src/contrib/raid/src/java/org/apache/hadoop/hdfs/ src/contrib/raid/src/java/org/apache/hadoop/raid/ src/contrib/raid/src/test/org/apache/hadoop/hdfs/ src/contrib/raid/src/test/org/apache/hadoo... Date: Tue, 12 Oct 2010 18:23:36 -0000 To: mapreduce-commits@hadoop.apache.org From: schen@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20101012182339.E423E23889E9@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java?rev=1021873&r1=1021872&r2=1021873&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java (original) +++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java Tue Oct 12 18:23:36 2010 @@ -21,10 +21,12 @@ package org.apache.hadoop.raid; import java.io.IOException; import java.io.FileNotFoundException; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.LinkedList; import java.util.Iterator; import java.util.Arrays; +import java.util.Map; import java.util.Random; import java.util.Set; import java.util.HashSet; @@ -73,7 +75,9 @@ public class RaidNode implements RaidPro public static final long SLEEP_TIME = 10000L; // 10 seconds public static final int DEFAULT_PORT = 60000; public static final int DEFAULT_STRIPE_LENGTH = 5; // default value of stripe length + public static final String STRIPE_LENGTH_KEY = "hdfs.raid.stripeLength"; public static final String DEFAULT_RAID_LOCATION = "/raid"; + public static final String RAID_LOCATION_KEY = "hdfs.raid.locations"; public static final String HAR_SUFFIX = "_raid.har"; /** RPC server */ @@ -101,6 +105,10 @@ public class RaidNode implements RaidPro /** Deamon thread to har raid directories */ Daemon harThread = null; + /** Daemon thread to monitor distributed raid job progress */ + JobMonitor jobMonitor = null; + Daemon jobMonitorThread = null; + /** Do do distributed raiding */ boolean isRaidLocal = false; @@ -168,6 +176,7 @@ public class RaidNode implements RaidPro try { initialize(conf); } catch (IOException e) { + LOG.error(StringUtils.stringifyException(e)); this.stop(); throw e; } catch (Exception e) { @@ -193,6 +202,7 @@ public class RaidNode implements RaidPro try { if (server != null) server.join(); if (triggerThread != null) triggerThread.join(); + if (jobMonitorThread != null) jobMonitorThread.join(); if (purgeThread != null) purgeThread.join(); } catch (InterruptedException ie) { // do nothing @@ -210,6 +220,8 @@ public class RaidNode implements RaidPro running = false; if (server != null) server.stop(); if (triggerThread != null) triggerThread.interrupt(); + if (jobMonitor != null) jobMonitor.running = false; + if (jobMonitorThread != null) jobMonitorThread.interrupt(); if (purgeThread != null) purgeThread.interrupt(); } @@ -252,6 +264,10 @@ public class RaidNode implements RaidPro running = true; this.server.start(); // start RPC server + this.jobMonitor = new JobMonitor(conf); + this.jobMonitorThread = new Daemon(this.jobMonitor); + this.jobMonitorThread.start(); + // start the deamon thread to fire polcies appropriately this.triggerThread = new Daemon(new TriggerMonitor()); this.triggerThread.start(); @@ -282,22 +298,15 @@ public class RaidNode implements RaidPro LOG.info("Recover File for " + inStr + " for corrupt offset " + corruptOffset); Path inputPath = new Path(inStr); Path srcPath = inputPath.makeQualified(inputPath.getFileSystem(conf)); - PolicyInfo info = findMatchingPolicy(srcPath); - if (info != null) { - - // find stripe length from config - int stripeLength = getStripeLength(conf, info); + // find stripe length from config + int stripeLength = getStripeLength(conf); - // create destination path prefix - String destPrefix = getDestinationPath(conf, info); - Path destPath = new Path(destPrefix.trim()); - FileSystem fs = FileSystem.get(destPath.toUri(), conf); - destPath = destPath.makeQualified(fs); - - Path unraided = unRaid(conf, srcPath, destPath, stripeLength, corruptOffset); - if (unraided != null) { - return unraided.toString(); - } + Path destPref = getDestinationPath(conf); + Decoder decoder = new XORDecoder(conf, RaidNode.getStripeLength(conf)); + Path unraided = unRaid(conf, srcPath, destPref, decoder, + stripeLength, corruptOffset); + if (unraided != null) { + return unraided.toString(); } return null; } @@ -306,6 +315,11 @@ public class RaidNode implements RaidPro * Periodically checks to see which policies should be fired. */ class TriggerMonitor implements Runnable { + + private Map scanTimes = new HashMap(); + private Map scanState = + new HashMap(); + /** */ public void run() { @@ -320,6 +334,109 @@ public class RaidNode implements RaidPro } } + /** + * Should we select more files for a policy. + */ + private boolean shouldSelectFiles(PolicyInfo info) { + String policyName = info.getName(); + int runningJobsCount = jobMonitor.runningJobsCount(policyName); + // Is there a scan in progress for this policy? + if (scanState.containsKey(policyName)) { + int maxJobsPerPolicy = configMgr.getMaxJobsPerPolicy(); + + // If there is a scan in progress for this policy, we can have + // upto maxJobsPerPolicy running jobs. + return (runningJobsCount < maxJobsPerPolicy); + } else { + // If there isn't a scan in progress for this policy, we don't + // want to start a fresh scan if there is even one running job. + if (runningJobsCount >= 1) { + return false; + } + // Check the time of the last full traversal before starting a fresh + // traversal. + if (scanTimes.containsKey(policyName)) { + long lastScan = scanTimes.get(policyName); + return (now() > lastScan + configMgr.getPeriodicity()); + } else { + return true; + } + } + } + + /** + * Returns a list of pathnames that needs raiding. + * The list of paths could be obtained by resuming a previously suspended + * traversal. + * The number of paths returned is limited by raid.distraid.max.jobs. + */ + private List selectFiles(PolicyInfo info) throws IOException { + Path destPrefix = getDestinationPath(conf); + String policyName = info.getName(); + Path srcPath = info.getSrcPath(); + long modTimePeriod = 0; + String str = info.getProperty("modTimePeriod"); + if (str != null) { + modTimePeriod = Long.parseLong(info.getProperty("modTimePeriod")); + } + short srcReplication = 0; + str = info.getProperty("srcReplication"); + if (str != null) { + srcReplication = Short.parseShort(info.getProperty("srcReplication")); + } + + // Max number of files returned. + int selectLimit = configMgr.getMaxFilesPerJob(); + int targetRepl = Integer.parseInt(info.getProperty("targetReplication")); + + // If we have a pending traversal, resume it. + if (scanState.containsKey(policyName)) { + DirectoryTraversal dt = scanState.get(policyName); + LOG.info("Resuming traversal for policy " + policyName); + List returnSet = dt.selectFilesToRaid( + conf, targetRepl, destPrefix, modTimePeriod, selectLimit); + if (dt.doneTraversal()) { + scanState.remove(policyName); + } + return returnSet; + } + + // Expand destination prefix path. + String destpstr = destPrefix.toString(); + if (!destpstr.endsWith(Path.SEPARATOR)) { + destpstr += Path.SEPARATOR; + } + + List returnSet = new LinkedList(); + + FileSystem fs = srcPath.getFileSystem(conf); + FileStatus[] gpaths = fs.globStatus(srcPath); + if (gpaths != null) { + List selectedPaths = new LinkedList(); + for (FileStatus onepath: gpaths) { + String pathstr = onepath.getPath().makeQualified(fs).toString(); + if (!pathstr.endsWith(Path.SEPARATOR)) { + pathstr += Path.SEPARATOR; + } + if (pathstr.startsWith(destpstr) || destpstr.startsWith(pathstr)) { + LOG.info("Skipping source " + pathstr + + " because it conflicts with raid directory " + destpstr); + } else { + selectedPaths.add(onepath); + } + } + + // Set the time for a new traversal. + scanTimes.put(policyName, now()); + DirectoryTraversal dt = new DirectoryTraversal(fs, selectedPaths); + returnSet = dt.selectFilesToRaid( + conf, targetRepl, destPrefix, modTimePeriod, selectLimit); + if (!dt.doneTraversal()) { + scanState.put(policyName, dt); + } + } + return returnSet; + } /** * Keep processing policies. @@ -328,18 +445,11 @@ public class RaidNode implements RaidPro private void doProcess() throws IOException, InterruptedException { PolicyList.CompareByPath lexi = new PolicyList.CompareByPath(); - long prevExec = 0; - DistRaid dr = null; while (running) { + Thread.sleep(SLEEP_TIME); - boolean reload = configMgr.reloadConfigsIfNecessary(); - while(!reload && now() < prevExec + configMgr.getPeriodicity()){ - Thread.sleep(SLEEP_TIME); - reload = configMgr.reloadConfigsIfNecessary(); - } + configMgr.reloadConfigsIfNecessary(); - prevExec = now(); - // activate all categories Collection all = configMgr.getAllPolicies(); @@ -348,35 +458,18 @@ public class RaidNode implements RaidPro PolicyList[] sorted = all.toArray(new PolicyList[all.size()]); Arrays.sort(sorted, lexi); - if (!isRaidLocal) { - dr = new DistRaid(conf); - } - // paths we have processed so far - List processed = new LinkedList(); - for (PolicyList category : sorted) { for (PolicyInfo info: category.getAll()) { - long modTimePeriod = 0; - short srcReplication = 0; - String str = info.getProperty("modTimePeriod"); - if (str != null) { - modTimePeriod = Long.parseLong(info.getProperty("modTimePeriod")); - } - str = info.getProperty("srcReplication"); - if (str != null) { - srcReplication = Short.parseShort(info.getProperty("srcReplication")); + if (!shouldSelectFiles(info)) { + continue; } LOG.info("Triggering Policy Filter " + info.getName() + " " + info.getSrcPath()); List filteredPaths = null; - try { - filteredPaths = selectFiles(conf, info.getSrcPath(), - getDestinationPath(conf, info), - modTimePeriod, - srcReplication, - prevExec); + try { + filteredPaths = selectFiles(info); } catch (Exception e) { LOG.info("Exception while invoking filter on policy " + info.getName() + " srcPath " + info.getSrcPath() + @@ -389,95 +482,41 @@ public class RaidNode implements RaidPro continue; } - // If any of the filtered path has already been accepted - // by a previous policy, then skip it. - for (Iterator iter = filteredPaths.iterator(); iter.hasNext();) { - String fs = iter.next().getPath().toString() + "/"; - for (String p : processed) { - if (p.startsWith(fs)) { - iter.remove(); - break; - } - } - } - // Apply the action on accepted paths - LOG.info("Triggering Policy Action " + info.getName()); + LOG.info("Triggering Policy Action " + info.getName() + + " " + info.getSrcPath()); try { - if (isRaidLocal){ - doRaid(conf, info, filteredPaths); - } - else{ - //add paths for distributed raiding - dr.addRaidPaths(info, filteredPaths); - } + if (isRaidLocal){ + doRaid(conf, info, filteredPaths); + } + else{ + // We already checked that no job for this policy is running + // So we can start a new job. + DistRaid dr = new DistRaid(conf); + //add paths for distributed raiding + dr.addRaidPaths(info, filteredPaths); + boolean started = dr.startDistRaid(); + if (started) { + jobMonitor.monitorJob(info.getName(), dr); + } + } } catch (Exception e) { LOG.info("Exception while invoking action on policy " + info.getName() + " srcPath " + info.getSrcPath() + " exception " + StringUtils.stringifyException(e)); continue; } - - // add these paths to processed paths - for (Iterator iter = filteredPaths.iterator(); iter.hasNext();) { - String p = iter.next().getPath().toString() + "/"; - processed.add(p); - } } } - processed.clear(); // free up memory references before yielding - - //do the distributed raiding - if (!isRaidLocal) { - dr.doDistRaid(); - } } } } - /** - * Returns the policy that matches the specified path. - * The method below finds the first policy that matches an input path. Since different - * policies with different purposes and destinations might be associated with the same input - * path, we should be skeptical about the places using the method and we should try to change - * the code to avoid it. - */ - private PolicyInfo findMatchingPolicy(Path inpath) throws IOException { - PolicyList.CompareByPath lexi = new PolicyList.CompareByPath(); - Collection all = configMgr.getAllPolicies(); - - // sort all policies by reverse lexicographical order. This is needed - // to make the nearest policy take precedence. - PolicyList[] sorted = all.toArray(new PolicyList[all.size()]); - Arrays.sort(sorted, lexi); - - // loop through all categories of policies. - for (PolicyList category : sorted) { - PolicyInfo first = category.getAll().iterator().next(); - if (first != null) { - Path[] srcPaths = first.getSrcPathExpanded(); // input src paths unglobbed - if (srcPaths == null) { - continue; - } - - for (Path src: srcPaths) { - if (inpath.toString().startsWith(src.toString())) { - // if the srcpath is a prefix of the specified path - // we have a match! - return first; - } - } - } - } - return null; // no matching policies - } - - static private Path getOriginalParityFile(Path destPathPrefix, Path srcPath) { return new Path(destPathPrefix, makeRelative(srcPath)); } - private static class ParityFilePair { + static class ParityFilePair { private Path path; private FileSystem fs; @@ -506,11 +545,19 @@ public class RaidNode implements RaidPro * @return Path object representing the parity file of the source * @throws IOException */ - static private ParityFilePair getParityFile(Path destPathPrefix, Path srcPath, Configuration conf) throws IOException { + static ParityFilePair getParityFile(Path destPathPrefix, Path srcPath, Configuration conf) throws IOException { Path srcParent = srcPath.getParent(); FileSystem fsDest = destPathPrefix.getFileSystem(conf); - + FileSystem fsSrc = srcPath.getFileSystem(conf); + + FileStatus srcStatus = null; + try { + srcStatus = fsSrc.getFileStatus(srcPath); + } catch (java.io.FileNotFoundException e) { + return null; + } + Path outDir = destPathPrefix; if (srcParent != null) { if (srcParent.getParent() == null) { @@ -520,36 +567,36 @@ public class RaidNode implements RaidPro } } + + //CASE 1: CHECK HAR - Must be checked first because har is created after + // parity file and returning the parity file could result in error while + // reading it. + Path outPath = getOriginalParityFile(destPathPrefix, srcPath); String harDirName = srcParent.getName() + HAR_SUFFIX; Path HarPath = new Path(outDir,harDirName); - Path outPath = getOriginalParityFile(destPathPrefix, srcPath); - - if (!fsDest.exists(HarPath)) { // case 1: no HAR file - return new ParityFilePair(outPath,fsDest); - } - - URI HarPathUri = HarPath.toUri(); - Path inHarPath = new Path("har://",HarPathUri.getPath()+"/"+outPath.toUri().getPath()); - FileSystem fsHar = new HarFileSystem(fsDest); - fsHar.initialize(inHarPath.toUri(), conf); - - if (!fsHar.exists(inHarPath)) { // case 2: no file inside HAR - return new ParityFilePair(outPath,fsDest); - } - - if (! fsDest.exists(outPath)) { // case 3: only inside HAR - return new ParityFilePair(inHarPath,fsHar); + if (fsDest.exists(HarPath)) { + URI HarPathUri = HarPath.toUri(); + Path inHarPath = new Path("har://",HarPathUri.getPath()+"/"+outPath.toUri().getPath()); + FileSystem fsHar = new HarFileSystem(fsDest); + fsHar.initialize(inHarPath.toUri(), conf); + if (fsHar.exists(inHarPath)) { + FileStatus inHar = fsHar.getFileStatus(inHarPath); + if (inHar.getModificationTime() == srcStatus.getModificationTime()) { + return new ParityFilePair(inHarPath,fsHar); + } + } } - - // both inside and outside HAR. Should return most recent - FileStatus inHar = fsHar.getFileStatus(inHarPath); - FileStatus outHar = fsDest.getFileStatus(outPath); - - if (inHar.getModificationTime() >= outHar.getModificationTime()) { - return new ParityFilePair(inHarPath,fsHar); + + //CASE 2: CHECK PARITY + try { + FileStatus outHar = fsDest.getFileStatus(outPath); + if (outHar.getModificationTime() == srcStatus.getModificationTime()) { + return new ParityFilePair(outPath,fsDest); + } + } catch (java.io.FileNotFoundException e) { } - return new ParityFilePair(outPath,fsDest); + return null; // NULL if no parity file } private ParityFilePair getParityFile(Path destPathPrefix, Path srcPath) throws IOException { @@ -558,108 +605,6 @@ public class RaidNode implements RaidPro } - /** - * Returns a list of pathnames that needs raiding. - */ - List selectFiles(Configuration conf, Path p, String destPrefix, - long modTimePeriod, short srcReplication, long now) throws IOException { - - List returnSet = new LinkedList(); - - // expand destination prefix path - Path destp = new Path(destPrefix.trim()); - FileSystem fs = FileSystem.get(destp.toUri(), conf); - destp = destp.makeQualified(fs); - - // Expand destination prefix path. - String destpstr = destp.toString(); - if (!destpstr.endsWith(Path.SEPARATOR)) { - destpstr += Path.SEPARATOR; - } - - fs = p.getFileSystem(conf); - FileStatus[] gpaths = fs.globStatus(p); - if (gpaths != null) { - for (FileStatus onepath: gpaths) { - String pathstr = onepath.getPath().makeQualified(fs).toString(); - if (!pathstr.endsWith(Path.SEPARATOR)) { - pathstr += Path.SEPARATOR; - } - if (pathstr.startsWith(destpstr) || destpstr.startsWith(pathstr)) { - LOG.info("Skipping source " + pathstr + - " because it conflicts with raid directory " + destpstr); - } else { - recurse(fs, conf, destp, onepath, returnSet, modTimePeriod, srcReplication, now); - } - } - } - return returnSet; - } - - /** - * Pick files that need to be RAIDed. - */ - private void recurse(FileSystem srcFs, - Configuration conf, - Path destPathPrefix, - FileStatus src, - List accept, - long modTimePeriod, - short srcReplication, - long now) throws IOException { - Path path = src.getPath(); - FileStatus[] files = null; - try { - files = srcFs.listStatus(path); - } catch (java.io.FileNotFoundException e) { - // ignore error because the file could have been deleted by an user - LOG.info("FileNotFound " + path + " " + StringUtils.stringifyException(e)); - } catch (IOException e) { - throw e; - } - - // If the modTime of the raid file is later than the modtime of the - // src file and the src file has not been modified - // recently, then that file is a candidate for RAID. - - if (src.isFile()) { - - // if the source file has fewer than or equal to 2 blocks, then no need to RAID - long blockSize = src.getBlockSize(); - if (2 * blockSize >= src.getLen()) { - return; - } - - // check if destination path already exists. If it does and it's modification time - // does not match the modTime of the source file, then recalculate RAID - boolean add = false; - try { - ParityFilePair ppair = getParityFile(destPathPrefix, path); - Path outpath = ppair.getPath(); - FileSystem outFs = ppair.getFileSystem(); - FileStatus ostat = outFs.getFileStatus(outpath); - if (ostat.getModificationTime() != src.getModificationTime() && - src.getModificationTime() + modTimePeriod < now) { - add = true; - } - } catch (java.io.FileNotFoundException e) { - add = true; // destination file does not exist - } - - if (add) { - accept.add(src); - } - return; - - } else if (files != null) { - for (FileStatus one:files) { - if (!one.getPath().getName().endsWith(HAR_SUFFIX)){ - recurse(srcFs, conf, destPathPrefix, one, accept, modTimePeriod, srcReplication, now); - } - } - } - } - /** * RAID a list of files. @@ -668,8 +613,8 @@ public class RaidNode implements RaidPro throws IOException { int targetRepl = Integer.parseInt(info.getProperty("targetReplication")); int metaRepl = Integer.parseInt(info.getProperty("metaReplication")); - int stripeLength = getStripeLength(conf, info); - String destPrefix = getDestinationPath(conf, info); + int stripeLength = getStripeLength(conf); + Path destPref = getDestinationPath(conf); String simulate = info.getProperty("simulate"); boolean doSimulate = simulate == null ? false : Boolean .parseBoolean(simulate); @@ -677,13 +622,9 @@ public class RaidNode implements RaidPro Statistics statistics = new Statistics(); int count = 0; - Path p = new Path(destPrefix.trim()); - FileSystem fs = FileSystem.get(p.toUri(), conf); - p = p.makeQualified(fs); - for (FileStatus s : paths) { - doRaid(conf, s, p, statistics, null, doSimulate, targetRepl, metaRepl, - stripeLength); + doRaid(conf, s, destPref, statistics, Reporter.NULL, doSimulate, targetRepl, + metaRepl, stripeLength); if (count % 1000 == 0) { LOG.info("RAID statistics " + statistics.toString()); } @@ -701,23 +642,16 @@ public class RaidNode implements RaidPro FileStatus src, Statistics statistics, Reporter reporter) throws IOException { int targetRepl = Integer.parseInt(info.getProperty("targetReplication")); int metaRepl = Integer.parseInt(info.getProperty("metaReplication")); - int stripeLength = getStripeLength(conf, info); - String destPrefix = getDestinationPath(conf, info); + int stripeLength = getStripeLength(conf); + Path destPref = getDestinationPath(conf); String simulate = info.getProperty("simulate"); boolean doSimulate = simulate == null ? false : Boolean .parseBoolean(simulate); - int count = 0; - - Path p = new Path(destPrefix.trim()); - FileSystem fs = FileSystem.get(p.toUri(), conf); - p = p.makeQualified(fs); - - doRaid(conf, src, p, statistics, reporter, doSimulate, targetRepl, metaRepl, - stripeLength); + doRaid(conf, src, destPref, statistics, reporter, doSimulate, + targetRepl, metaRepl, stripeLength); } - - + /** * RAID an individual file */ @@ -784,25 +718,11 @@ public class RaidNode implements RaidPro Path destPathPrefix, BlockLocation[] locations, int metaRepl, int stripeLength) throws IOException { - // two buffers for generating parity - Random rand = new Random(); - int bufSize = 5 * 1024 * 1024; // 5 MB - byte[] bufs = new byte[bufSize]; - byte[] xor = new byte[bufSize]; - Path inpath = stat.getPath(); - long blockSize = stat.getBlockSize(); - long fileSize = stat.getLen(); - - // create output tmp path Path outpath = getOriginalParityFile(destPathPrefix, inpath); FileSystem outFs = outpath.getFileSystem(conf); - - Path tmppath = new Path(conf.get("fs.raid.tmpdir", "/tmp/raid") + - outpath.toUri().getPath() + "." + - rand.nextLong() + ".tmp"); - // if the parity file is already upto-date, then nothing to do + // If the parity file is already upto-date, then nothing to do try { FileStatus stmp = outFs.getFileStatus(outpath); if (stmp.getModificationTime() == stat.getModificationTime()) { @@ -812,66 +732,11 @@ public class RaidNode implements RaidPro } } catch (IOException e) { // ignore errors because the raid file might not exist yet. - } - - LOG.info("Parity file for " + inpath + "(" + locations.length + ") is " + outpath); - FSDataOutputStream out = outFs.create(tmppath, - true, - conf.getInt("io.file.buffer.size", 64 * 1024), - (short)metaRepl, - blockSize); - - try { - - // loop once for every stripe length - for (int startBlock = 0; startBlock < locations.length;) { - - // report progress to Map-reduce framework - if (reporter != null) { - reporter.progress(); - } - int blocksLeft = locations.length - startBlock; - int stripe = Math.min(stripeLength, blocksLeft); - LOG.info(" startBlock " + startBlock + " stripe " + stripe); - - // open a new file descriptor for each block in this stripe. - // make each fd point to the beginning of each block in this stripe. - FSDataInputStream[] ins = new FSDataInputStream[stripe]; - for (int i = 0; i < stripe; i++) { - ins[i] = inFs.open(inpath, bufSize); - ins[i].seek(blockSize * (startBlock + i)); - } - - generateParity(ins,out,blockSize,bufs,xor, reporter); - - // close input file handles - for (int i = 0; i < ins.length; i++) { - ins[i].close(); - } - - // increment startBlock to point to the first block to be processed - // in the next iteration - startBlock += stripe; - } - out.close(); - out = null; - - // delete destination if exists - if (outFs.exists(outpath)){ - outFs.delete(outpath, false); - } - // rename tmppath to the real parity filename - outFs.mkdirs(outpath.getParent()); - if (!outFs.rename(tmppath, outpath)) { - String msg = "Unable to rename tmp file " + tmppath + " to " + outpath; - LOG.warn(msg); - throw new IOException (msg); - } - } finally { - // remove the tmp file if it still exists - outFs.delete(tmppath, false); } + XOREncoder encoder = new XOREncoder(conf, stripeLength); + encoder.encodeFile(inFs, inpath, outpath, (short)metaRepl, reporter); + // set the modification time of the RAID file. This is done so that the modTime of the // RAID file reflects that contents of the source file that it has RAIDed. This should // also work for files that are being appended to. This is necessary because the time on @@ -880,255 +745,39 @@ public class RaidNode implements RaidPro outFs.setTimes(outpath, stat.getModificationTime(), -1); FileStatus outstat = outFs.getFileStatus(outpath); - LOG.info("Source file " + inpath + " of size " + fileSize + + FileStatus inStat = inFs.getFileStatus(inpath); + LOG.info("Source file " + inpath + " of size " + inStat.getLen() + " Parity file " + outpath + " of size " + outstat.getLen() + " src mtime " + stat.getModificationTime() + " parity mtime " + outstat.getModificationTime()); } - private static int readInputUntilEnd(FSDataInputStream ins, byte[] bufs, int toRead) - throws IOException { - - int tread = 0; - - while (tread < toRead) { - int read = ins.read(bufs, tread, toRead - tread); - if (read == -1) { - return tread; - } else { - tread += read; - } - } - - return tread; - } - - private static void generateParity(FSDataInputStream[] ins, FSDataOutputStream fout, - long parityBlockSize, byte[] bufs, byte[] xor, Reporter reporter) throws IOException { - - int bufSize; - if ((bufs == null) || (bufs.length == 0)){ - bufSize = 5 * 1024 * 1024; // 5 MB - bufs = new byte[bufSize]; - } else { - bufSize = bufs.length; - } - if ((xor == null) || (xor.length != bufs.length)){ - xor = new byte[bufSize]; - } - - int xorlen = 0; - - // this loop processes all good blocks in selected stripe - long remaining = parityBlockSize; - - while (remaining > 0) { - int toRead = (int)Math.min(remaining, bufSize); - - if (ins.length > 0) { - xorlen = readInputUntilEnd(ins[0], xor, toRead); - } - - // read all remaining blocks and xor them into the buffer - for (int i = 1; i < ins.length; i++) { - - // report progress to Map-reduce framework - if (reporter != null) { - reporter.progress(); - } - - int actualRead = readInputUntilEnd(ins[i], bufs, toRead); - - int j; - int xorlimit = (int) Math.min(xorlen,actualRead); - for (j = 0; j < xorlimit; j++) { - xor[j] ^= bufs[j]; - } - if ( actualRead > xorlen ){ - for (; j < actualRead; j++) { - xor[j] = bufs[j]; - } - xorlen = actualRead; - } - - } - - if (xorlen < toRead) { - Arrays.fill(bufs, xorlen, toRead, (byte) 0); - } - - // write this to the tmp file - fout.write(xor, 0, toRead); - remaining -= toRead; - } - - } - /** - * Extract a good block from the parity block. This assumes that the corruption - * is in the main file and the parity file is always good. + * Extract a good block from the parity block. This assumes that the + * corruption is in the main file and the parity file is always good. */ - public static Path unRaid(Configuration conf, Path srcPath, Path destPathPrefix, - int stripeLength, long corruptOffset) throws IOException { - - // extract block locations, size etc from source file - Random rand = new Random(); - FileSystem srcFs = srcPath.getFileSystem(conf); - FileStatus srcStat = srcFs.getFileStatus(srcPath); - long blockSize = srcStat.getBlockSize(); - long fileSize = srcStat.getLen(); - - // find the stripe number where the corrupted offset lies - long snum = corruptOffset / (stripeLength * blockSize); - long startOffset = snum * stripeLength * blockSize; - long corruptBlockInStripe = (corruptOffset - startOffset)/blockSize; - long corruptBlockSize = Math.min(blockSize, fileSize - startOffset); - - LOG.info("Start offset of relevent stripe = " + startOffset + - " corruptBlockInStripe " + corruptBlockInStripe); - - // open file descriptors to read all good blocks of the file - FSDataInputStream[] instmp = new FSDataInputStream[stripeLength]; - int numLength = 0; - for (int i = 0; i < stripeLength; i++) { - if (i == corruptBlockInStripe) { - continue; // do not open corrupt block - } - if (startOffset + i * blockSize >= fileSize) { - LOG.info("Stop offset of relevent stripe = " + - startOffset + i * blockSize); - break; - } - instmp[numLength] = srcFs.open(srcPath); - instmp[numLength].seek(startOffset + i * blockSize); - numLength++; - } - - // create array of inputstream, allocate one extra slot for - // parity file. numLength could be smaller than stripeLength - // if we are processing the last partial stripe on a file. - numLength += 1; - FSDataInputStream[] ins = new FSDataInputStream[numLength]; - for (int i = 0; i < numLength-1; i++) { - ins[i] = instmp[i]; - } - LOG.info("Decompose a total of " + numLength + " blocks."); - - // open and seek to the appropriate offset in parity file. - ParityFilePair ppair = getParityFile(destPathPrefix, srcPath, conf); - Path parityFile = ppair.getPath(); - FileSystem parityFs = ppair.getFileSystem(); - LOG.info("Parity file for " + srcPath + " is " + parityFile); - ins[numLength-1] = parityFs.open(parityFile); - ins[numLength-1].seek(snum * blockSize); - LOG.info("Parity file " + parityFile + - " seeking to relevent block at offset " + - ins[numLength-1].getPos()); - - // create a temporary filename in the source filesystem - // do not overwrite an existing tmp file. Make it fail for now. - // We need to generate a unique name for this tmp file later on. - Path tmpFile = null; - FSDataOutputStream fout = null; - FileSystem destFs = destPathPrefix.getFileSystem(conf); - int retry = 5; - try { - tmpFile = new Path(conf.get("fs.raid.tmpdir", "/tmp/raid") + "/" + - rand.nextInt()); - fout = destFs.create(tmpFile, false); - } catch (IOException e) { - if (retry-- <= 0) { - LOG.info("Unable to create temporary file " + tmpFile + - " Aborting...."); - throw e; - } - LOG.info("Unable to create temporary file " + tmpFile + - "Retrying...."); - } - LOG.info("Created recovered block file " + tmpFile); - - // buffers for generating parity bits - int bufSize = 5 * 1024 * 1024; // 5 MB - byte[] bufs = new byte[bufSize]; - byte[] xor = new byte[bufSize]; - - generateParity(ins,fout,corruptBlockSize,bufs,xor,null); - - // close all files - fout.close(); - for (int i = 0; i < ins.length; i++) { - ins[i].close(); + public static Path unRaid(Configuration conf, Path srcPath, + Path destPathPrefix, Decoder decoder, int stripeLength, + long corruptOffset) throws IOException { + + // Test if parity file exists + ParityFilePair ppair = getParityFile(destPathPrefix, srcPath, conf); + if (ppair == null) { + return null; } - // Now, reopen the source file and the recovered block file - // and copy all relevant data to new file final Path recoveryDestination = new Path(conf.get("fs.raid.tmpdir", "/tmp/raid")); + FileSystem destFs = recoveryDestination.getFileSystem(conf); final Path recoveredPrefix = destFs.makeQualified(new Path(recoveryDestination, makeRelative(srcPath))); final Path recoveredPath = - new Path(recoveredPrefix + "." + rand.nextLong() + ".recovered"); + new Path(recoveredPrefix + "." + new Random().nextLong() + ".recovered"); LOG.info("Creating recovered file " + recoveredPath); - FSDataInputStream sin = srcFs.open(srcPath); - FSDataOutputStream out = destFs.create(recoveredPath, false, - conf.getInt("io.file.buffer.size", 64 * 1024), - srcStat.getReplication(), - srcStat.getBlockSize()); - - FSDataInputStream bin = destFs.open(tmpFile); - long recoveredSize = 0; - - // copy all the good blocks (upto the corruption) - // from source file to output file - long remaining = corruptOffset / blockSize * blockSize; - while (remaining > 0) { - int toRead = (int)Math.min(remaining, bufSize); - sin.readFully(bufs, 0, toRead); - out.write(bufs, 0, toRead); - remaining -= toRead; - recoveredSize += toRead; - } - LOG.info("Copied upto " + recoveredSize + " from src file. "); - - // copy recovered block to output file - remaining = corruptBlockSize; - while (recoveredSize < fileSize && - remaining > 0) { - int toRead = (int)Math.min(remaining, bufSize); - bin.readFully(bufs, 0, toRead); - out.write(bufs, 0, toRead); - remaining -= toRead; - recoveredSize += toRead; - } - LOG.info("Copied upto " + recoveredSize + " from recovered-block file. "); - - // skip bad block in src file - if (recoveredSize < fileSize) { - sin.seek(sin.getPos() + corruptBlockSize); - } - - // copy remaining good data from src file to output file - while (recoveredSize < fileSize) { - int toRead = (int)Math.min(fileSize - recoveredSize, bufSize); - sin.readFully(bufs, 0, toRead); - out.write(bufs, 0, toRead); - recoveredSize += toRead; - } - out.close(); - LOG.info("Completed writing " + recoveredSize + " bytes into " + - recoveredPath); - - sin.close(); - bin.close(); - - // delete the temporary block file that was created. - destFs.delete(tmpFile, false); - LOG.info("Deleted temporary file " + tmpFile); - - // copy the meta information from source path to the newly created - // recovered path - copyMetaInformation(destFs, srcStat, recoveredPath); + FileSystem srcFs = srcPath.getFileSystem(conf); + decoder.decodeFile(srcFs, srcPath, ppair.getFileSystem(), + ppair.getPath(), corruptOffset, recoveredPath); return recoveredPath; } @@ -1179,35 +828,22 @@ public class RaidNode implements RaidPro PolicyList[] sorted = all.toArray(new PolicyList[all.size()]); Arrays.sort(sorted, lexi); - // paths we have processed so far - Set processed = new HashSet(); - for (PolicyList category : sorted) { for (PolicyInfo info: category.getAll()) { try { // expand destination prefix path - String destinationPrefix = getDestinationPath(conf, info); - Path destPref = new Path(destinationPrefix.trim()); - FileSystem destFs = FileSystem.get(destPref.toUri(), conf); - destPref = destFs.makeQualified(destPref); + Path destPref = getDestinationPath(conf); + FileSystem destFs = destPref.getFileSystem(conf); //get srcPaths Path[] srcPaths = info.getSrcPathExpanded(); - if ( srcPaths != null ){ + if (srcPaths != null) { for (Path srcPath: srcPaths) { // expand destination prefix Path destPath = getOriginalParityFile(destPref, srcPath); - // if this destination path has already been processed as part - // of another policy, then nothing more to do - if (processed.contains(destPath)) { - LOG.info("Obsolete parity files for policy " + - info.getName() + " has already been procesed."); - continue; - } - FileSystem srcFs = info.getSrcPath().getFileSystem(conf); FileStatus stat = null; try { @@ -1221,12 +857,8 @@ public class RaidNode implements RaidPro recursePurge(srcFs, destFs, destPref.toUri().getPath(), stat); } - // this destination path has already been processed - processed.add(destPath); - } } - } catch (Exception e) { LOG.warn("Ignoring Exception while processing policy " + info.getName() + " " + @@ -1342,10 +974,8 @@ public class RaidNode implements RaidPro try { long cutoff = now() - ( Long.parseLong(str) * 24L * 3600000L ); - String destinationPrefix = getDestinationPath(conf, info); - Path destPref = new Path(destinationPrefix.trim()); + Path destPref = getDestinationPath(conf); FileSystem destFs = destPref.getFileSystem(conf); - destPref = destFs.makeQualified(destPref); //get srcPaths Path[] srcPaths = info.getSrcPathExpanded(); @@ -1407,7 +1037,11 @@ public class RaidNode implements RaidPro recurseHar(info, destFs, one, destPrefix, srcFs, cutoff, tmpHarPath); shouldHar = false; } else if (one.getModificationTime() > cutoff ) { - shouldHar = false; + if (shouldHar) { + LOG.info("Cannot archive " + destPath + + " because " + one.getPath() + " was modified after cutoff"); + shouldHar = false; + } } } @@ -1433,6 +1067,7 @@ public class RaidNode implements RaidPro } if ( shouldHar ) { + LOG.info("Archiving " + dest.getPath() + " to " + tmpHarPath ); singleHar(destFs, dest, tmpHarPath); } } @@ -1493,56 +1128,25 @@ public class RaidNode implements RaidPro LOG.info("Leaving Har thread."); } - - } - - /** - * If the config file has an entry for hdfs.raid.locations, then that overrides - * destination path specified in the raid policy file - */ - static private String getDestinationPath(Configuration conf, PolicyInfo info) { - String locs = conf.get("hdfs.raid.locations"); - if (locs != null) { - return locs; - } - locs = info.getDestinationPath(); - if (locs == null) { - return DEFAULT_RAID_LOCATION; - } - return locs; } /** - * If the config file has an entry for hdfs.raid.stripeLength, then use that - * specified in the raid policy file + * Return the path prefix that stores the parity files */ - static private int getStripeLength(Configuration conf, PolicyInfo info) - throws IOException { - int len = conf.getInt("hdfs.raid.stripeLength", 0); - if (len != 0) { - return len; - } - String str = info.getProperty("stripeLength"); - if (str == null) { - String msg = "hdfs.raid.stripeLength is not defined." + - " Using a default " + DEFAULT_STRIPE_LENGTH; - LOG.info(msg); - return DEFAULT_STRIPE_LENGTH; - } - return Integer.parseInt(str); + static Path getDestinationPath(Configuration conf) + throws IOException { + String loc = conf.get(RAID_LOCATION_KEY, DEFAULT_RAID_LOCATION); + Path p = new Path(loc.trim()); + FileSystem fs = FileSystem.get(p.toUri(), conf); + p = p.makeQualified(fs); + return p; } /** - * Copy the file owner, modtime, etc from srcPath to the recovered Path. - * It is possiible that we might have to retrieve file persmissions, - * quotas, etc too in future. + * Obtain stripe length from configuration */ - static private void copyMetaInformation(FileSystem fs, FileStatus stat, - Path recoveredPath) - throws IOException { - fs.setOwner(recoveredPath, stat.getOwner(), stat.getGroup()); - fs.setPermission(recoveredPath, stat.getPermission()); - fs.setTimes(recoveredPath, stat.getModificationTime(), stat.getAccessTime()); + public static int getStripeLength(Configuration conf) { + return conf.getInt(STRIPE_LENGTH_KEY, DEFAULT_STRIPE_LENGTH); } /** Added: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java?rev=1021873&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java (added) +++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java Tue Oct 12 18:23:36 2010 @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.raid; + +import java.io.InputStream; +import java.io.OutputStream; +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.io.IOUtils; + +public class RaidUtils { + public static void readTillEnd(InputStream in, byte[] buf, boolean eofOK) + throws IOException { + int toRead = buf.length; + int numRead = 0; + while (numRead < toRead) { + int nread = in.read(buf, numRead, toRead - numRead); + if (nread < 0) { + if (eofOK) { + // EOF hit, fill with zeros + Arrays.fill(buf, numRead, toRead, (byte)0); + numRead = toRead; + } else { + // EOF hit, throw. + throw new IOException("Premature EOF"); + } + } else { + numRead += nread; + } + } + } + + public static void copyBytes( + InputStream in, OutputStream out, byte[] buf, long count) + throws IOException { + for (long bytesRead = 0; bytesRead < count; ) { + int toRead = Math.min(buf.length, (int)(count - bytesRead)); + IOUtils.readFully(in, buf, 0, toRead); + bytesRead += toRead; + out.write(buf, 0, toRead); + } + } + + public static class ZeroInputStream extends InputStream + implements Seekable, PositionedReadable { + private long endOffset; + private long pos; + + public ZeroInputStream(long endOffset) { + this.endOffset = endOffset; + this.pos = 0; + } + + @Override + public int read() throws IOException { + if (pos < endOffset) { + pos++; + return 0; + } + return -1; + } + + @Override + public int available() throws IOException { + return (int)(endOffset - pos); + } + + @Override + public long getPos() throws IOException { + return pos; + } + + @Override + public void seek(long seekOffset) throws IOException { + if (seekOffset < endOffset) { + pos = seekOffset; + } else { + throw new IOException("Illegal Offset" + pos); + } + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + return false; + } + + @Override + public int read(long position, byte[] buffer, int offset, int length) + throws IOException { + int count = 0; + for (; position < endOffset && count < length; position++) { + buffer[offset + count] = 0; + count++; + } + return count; + } + + @Override + public void readFully(long position, byte[] buffer, int offset, int length) + throws IOException { + int count = 0; + for (; position < endOffset && count < length; position++) { + buffer[offset + count] = 0; + count++; + } + if (count < length) { + throw new IOException("Premature EOF"); + } + } + + @Override + public void readFully(long position, byte[] buffer) throws IOException { + readFully(position, buffer, 0, buffer.length); + } + } +} Added: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XORDecoder.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XORDecoder.java?rev=1021873&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XORDecoder.java (added) +++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XORDecoder.java Tue Oct 12 18:23:36 2010 @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.raid; + +import java.io.OutputStream; +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; + +public class XORDecoder extends Decoder { + public static final Log LOG = LogFactory.getLog( + "org.apache.hadoop.raid.XORDecoder"); + + public XORDecoder( + Configuration conf, int stripeSize) { + super(conf, stripeSize, 1); + } + + @Override + protected void fixErasedBlock( + FileSystem fs, Path srcFile, FileSystem parityFs, Path parityFile, + long blockSize, long errorOffset, long bytesToSkip, long limit, + OutputStream out) throws IOException { + LOG.info("Fixing block at " + srcFile + ":" + errorOffset + + ", skipping " + bytesToSkip + ", limit " + limit); + FileStatus srcStat = fs.getFileStatus(srcFile); + ArrayList xorinputs = new ArrayList(); + + FSDataInputStream parityFileIn = parityFs.open(parityFile); + parityFileIn.seek(parityOffset(errorOffset, blockSize)); + xorinputs.add(parityFileIn); + + long errorBlockOffset = (errorOffset / blockSize) * blockSize; + long[] srcOffsets = stripeOffsets(errorOffset, blockSize); + for (int i = 0; i < srcOffsets.length; i++) { + if (srcOffsets[i] == errorBlockOffset) { + LOG.info("Skipping block at " + srcFile + ":" + errorBlockOffset); + continue; + } + if (srcOffsets[i] < srcStat.getLen()) { + FSDataInputStream in = fs.open(srcFile); + in.seek(srcOffsets[i]); + xorinputs.add(in); + } + } + FSDataInputStream[] inputs = xorinputs.toArray( + new FSDataInputStream[]{null}); + ParityInputStream recovered = + new ParityInputStream(inputs, limit, readBufs[0], writeBufs[0]); + recovered.skip(bytesToSkip); + recovered.drain(out, null); + } + + protected long[] stripeOffsets(long errorOffset, long blockSize) { + long[] offsets = new long[stripeSize]; + long stripeIdx = errorOffset / (blockSize * stripeSize); + long startOffsetOfStripe = stripeIdx * stripeSize * blockSize; + for (int i = 0; i < stripeSize; i++) { + offsets[i] = startOffsetOfStripe + i * blockSize; + } + return offsets; + } + + protected long parityOffset(long errorOffset, long blockSize) { + long stripeIdx = errorOffset / (blockSize * stripeSize); + return stripeIdx * blockSize; + } + +} Added: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XOREncoder.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XOREncoder.java?rev=1021873&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XOREncoder.java (added) +++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XOREncoder.java Tue Oct 12 18:23:36 2010 @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.raid; + +import java.io.OutputStream; +import java.io.InputStream; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Progressable; + +public class XOREncoder extends Encoder { + public static final Log LOG = LogFactory.getLog( + "org.apache.hadoop.raid.XOREncoder"); + public XOREncoder( + Configuration conf, int stripeSize) { + super(conf, stripeSize, 1); + } + + @Override + protected void encodeStripe( + InputStream[] blocks, + long stripeStartOffset, + long blockSize, + OutputStream[] outs, + Progressable reporter) throws IOException { + LOG.info("Peforming XOR "); + ParityInputStream parityIn = + new ParityInputStream(blocks, blockSize, readBufs[0], writeBufs[0]); + try { + parityIn.drain(outs[0], reporter); + } finally { + parityIn.close(); + } + } +} Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java?rev=1021873&r1=1021872&r2=1021873&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java (original) +++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java Tue Oct 12 18:23:36 2010 @@ -47,11 +47,14 @@ import org.apache.hadoop.fs.FSDataOutput import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedRaidFileSystem; import org.apache.hadoop.raid.RaidNode; +import org.apache.hadoop.raid.protocol.PolicyInfo; public class TestRaidDfs extends TestCase { final static String TEST_DIR = new File(System.getProperty("test.build.data", @@ -59,8 +62,7 @@ public class TestRaidDfs extends TestCas final static String CONFIG_FILE = new File(TEST_DIR, "test-raid.xml").getAbsolutePath(); final static long RELOAD_INTERVAL = 1000; - final static Log LOG = LogFactory.getLog("org.apache.hadoop.raid.TestRaidNode"); - final Random rand = new Random(); + final static Log LOG = LogFactory.getLog("org.apache.hadoop.raid.TestRaidDfs"); final static int NUM_DATANODES = 3; Configuration conf; @@ -83,6 +85,9 @@ public class TestRaidDfs extends TestCas // scan all policies once every 5 second conf.setLong("raid.policy.rescan.interval", 5000); + // make all deletions not go through Trash + conf.set("fs.shell.delete.classname", "org.apache.hadoop.hdfs.DFSClient"); + // do not use map-reduce cluster for Raiding conf.setBoolean("fs.raidnode.local", true); conf.set("raid.server.address", "localhost:0"); @@ -133,80 +138,148 @@ public class TestRaidDfs extends TestCas if (cnode != null) { cnode.stop(); cnode.join(); } if (dfs != null) { dfs.shutdown(); } } + + private LocatedBlocks getBlockLocations(Path file, long length) + throws IOException { + DistributedFileSystem dfs = (DistributedFileSystem) fileSys; + return dfs.getClient().namenode.getBlockLocations(file.toString(), 0, length); + } - /** - * Test DFS Raid - */ - public void testRaidDfs() throws Exception { - LOG.info("Test testRaidDfs started."); - long blockSize = 8192L; - int stripeLength = 3; - mySetup(); - Path file1 = new Path("/user/dhruba/raidtest/file1"); - Path destPath = new Path("/destraid/user/dhruba/raidtest"); - long crc1 = createOldFile(fileSys, file1, 1, 7, blockSize); - LOG.info("Test testPathFilter created test files"); + private LocatedBlocks getBlockLocations(Path file) + throws IOException { + FileStatus stat = fileSys.getFileStatus(file); + return getBlockLocations(file, stat.getLen()); + } - // create an instance of the RaidNode - cnode = RaidNode.createRaidNode(null, conf); - - try { - FileStatus[] listPaths = null; + private DistributedRaidFileSystem getRaidFS() throws IOException { + DistributedFileSystem dfs = (DistributedFileSystem)fileSys; + Configuration clientConf = new Configuration(conf); + clientConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedRaidFileSystem"); + clientConf.set("fs.raid.underlyingfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); + clientConf.setBoolean("fs.hdfs.impl.disable.cache", true); + URI dfsUri = dfs.getUri(); + return (DistributedRaidFileSystem)FileSystem.get(dfsUri, clientConf); + } - // wait till file is raided - while (listPaths == null || listPaths.length != 1) { - LOG.info("Test testPathFilter waiting for files to be raided."); - try { - listPaths = fileSys.listStatus(destPath); - } catch (FileNotFoundException e) { - //ignore + public static void waitForFileRaided( + Log logger, FileSystem fileSys, Path file, Path destPath) + throws IOException, InterruptedException { + FileStatus parityStat = null; + String fileName = file.getName().toString(); + // wait till file is raided + while (parityStat == null) { + logger.info("Waiting for files to be raided."); + try { + FileStatus[] listPaths = fileSys.listStatus(destPath); + if (listPaths != null) { + for (FileStatus f : listPaths) { + logger.info("File raided so far : " + f.getPath()); + String found = f.getPath().getName().toString(); + if (fileName.equals(found)) { + parityStat = f; + break; + } + } } - Thread.sleep(1000); // keep waiting + } catch (FileNotFoundException e) { + //ignore } - assertEquals(listPaths.length, 1); // all files raided - LOG.info("Files raided so far : " + listPaths[0].getPath()); + Thread.sleep(1000); // keep waiting + } - // extract block locations from File system. Wait till file is closed. + while (true) { LocatedBlocks locations = null; DistributedFileSystem dfs = (DistributedFileSystem) fileSys; - while (true) { - locations = dfs.getClient().getNamenode().getBlockLocations(file1.toString(), - 0, listPaths[0].getLen()); - if (!locations.isUnderConstruction()) { - break; - } - Thread.sleep(1000); + locations = dfs.getClient().namenode.getBlockLocations( + file.toString(), 0, parityStat.getLen()); + if (!locations.isUnderConstruction()) { + break; } + Thread.sleep(1000); + } - // filter all filesystem calls from client - Configuration clientConf = new Configuration(conf); - clientConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedRaidFileSystem"); - clientConf.set("fs.raid.underlyingfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); - URI dfsUri = dfs.getUri(); - FileSystem.closeAll(); - FileSystem raidfs = FileSystem.get(dfsUri, clientConf); - - assertTrue("raidfs not an instance of DistributedRaidFileSystem",raidfs instanceof DistributedRaidFileSystem); - - LOG.info("Corrupt first block of file"); - corruptBlock(file1, locations.get(0).getBlock(), NUM_DATANODES, false); - validateFile(raidfs, file1, file1, crc1); + while (true) { + FileStatus stat = fileSys.getFileStatus(file); + if (stat.getReplication() == 1) break; + Thread.sleep(1000); + } + } + + private void corruptBlockAndValidate(Path srcFile, Path destPath, + int[] listBlockNumToCorrupt, long blockSize, int numBlocks) + throws IOException, InterruptedException { + int repl = 1; + long crc = createTestFilePartialLastBlock(fileSys, srcFile, repl, + numBlocks, blockSize); + long length = fileSys.getFileStatus(srcFile).getLen(); + + waitForFileRaided(LOG, fileSys, srcFile, destPath); + + // Delete first block of file + for (int blockNumToCorrupt : listBlockNumToCorrupt) { + LOG.info("Corrupt block " + blockNumToCorrupt + " of file " + srcFile); + LocatedBlocks locations = getBlockLocations(srcFile); + corruptBlock(srcFile, locations.get(blockNumToCorrupt).getBlock(), + NUM_DATANODES, true); + } + + // Validate + DistributedRaidFileSystem raidfs = getRaidFS(); + assertTrue(validateFile(raidfs, srcFile, length, crc)); + } + + /** + * Create a file, corrupt a block in it and ensure that the file can be + * read through DistributedRaidFileSystem. + */ + public void testRaidDfs() throws Exception { + LOG.info("Test testRaidDfs started."); + + long blockSize = 8192L; + int numBlocks = 8; + int repl = 1; + mySetup(); + + // Create an instance of the RaidNode + Configuration localConf = new Configuration(conf); + localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid"); + cnode = RaidNode.createRaidNode(null, localConf); + + Path file = new Path("/user/dhruba/raidtest/file"); + Path destPath = new Path("/destraid/user/dhruba/raidtest"); + int[][] corrupt = {{0}, {4}, {7}}; // first, last and middle block + try { + long crc = createTestFilePartialLastBlock(fileSys, file, repl, + numBlocks, blockSize); + long length = fileSys.getFileStatus(file).getLen(); + waitForFileRaided(LOG, fileSys, file, destPath); + LocatedBlocks locations = getBlockLocations(file); + + for (int i = 0; i < corrupt.length; i++) { + int blockNumToCorrupt = corrupt[i][0]; + LOG.info("Corrupt block " + blockNumToCorrupt + " of file"); + corruptBlock(file, locations.get(blockNumToCorrupt).getBlock(), + NUM_DATANODES, false); + validateFile(getRaidFS(), file, length, crc); + } // Corrupt one more block. This is expected to fail. - LOG.info("Corrupt second block of file"); - corruptBlock(file1, locations.get(1).getBlock(), NUM_DATANODES, false); + LOG.info("Corrupt one more block of file"); + corruptBlock(file, locations.get(1).getBlock(), NUM_DATANODES, false); try { - validateFile(raidfs, file1, file1, crc1); + validateFile(getRaidFS(), file, length, crc); fail("Expected exception ChecksumException not thrown!"); } catch (org.apache.hadoop.fs.ChecksumException e) { } } catch (Exception e) { - LOG.info("testPathFilter Exception " + e + StringUtils.stringifyException(e)); + LOG.info("testRaidDfs Exception " + e + + StringUtils.stringifyException(e)); throw e; } finally { + if (cnode != null) { cnode.stop(); cnode.join(); } myTearDown(); } - LOG.info("Test testPathFilter completed."); + LOG.info("Test testRaidDfs completed."); } /** @@ -217,7 +290,7 @@ public class TestRaidDfs extends TestCas try { Path file = new Path("/user/raid/raidtest/file1"); - createOldFile(fileSys, file, 1, 7, 8192L); + createTestFile(fileSys, file, 1, 7, 8192L); // filter all filesystem calls from client Configuration clientConf = new Configuration(conf); @@ -242,13 +315,15 @@ public class TestRaidDfs extends TestCas myTearDown(); } } - + // // creates a file and populate it with random data. Returns its crc. // - private long createOldFile(FileSystem fileSys, Path name, int repl, int numBlocks, long blocksize) + public static long createTestFile(FileSystem fileSys, Path name, int repl, + int numBlocks, long blocksize) throws IOException { CRC32 crc = new CRC32(); + Random rand = new Random(); FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf().getInt("io.file.buffer.size", 4096), (short)repl, blocksize); @@ -264,19 +339,43 @@ public class TestRaidDfs extends TestCas } // - // validates that file matches the crc. + // Creates a file with partially full last block. Populate it with random + // data. Returns its crc. // - private void validateFile(FileSystem fileSys, Path name1, Path name2, long crc) + public static long createTestFilePartialLastBlock( + FileSystem fileSys, Path name, int repl, int numBlocks, long blocksize) throws IOException { + CRC32 crc = new CRC32(); + Random rand = new Random(); + FSDataOutputStream stm = fileSys.create(name, true, + fileSys.getConf().getInt("io.file.buffer.size", 4096), + (short)repl, blocksize); + // Write whole blocks. + byte[] b = new byte[(int)blocksize]; + for (int i = 1; i < numBlocks; i++) { + rand.nextBytes(b); + stm.write(b); + crc.update(b); + } + // Write partial block. + b = new byte[(int)blocksize/2 - 1]; + rand.nextBytes(b); + stm.write(b); + crc.update(b); - FileStatus stat1 = fileSys.getFileStatus(name1); - FileStatus stat2 = fileSys.getFileStatus(name2); - assertTrue(" Length of file " + name1 + " is " + stat1.getLen() + - " is different from length of file " + name1 + " " + stat2.getLen(), - stat1.getLen() == stat2.getLen()); + stm.close(); + return crc.getValue(); + } + // + // validates that file matches the crc. + // + public static boolean validateFile(FileSystem fileSys, Path name, long length, + long crc) + throws IOException { + long numRead = 0; CRC32 newcrc = new CRC32(); - FSDataInputStream stm = fileSys.open(name2); + FSDataInputStream stm = fileSys.open(name); final byte[] b = new byte[4192]; int num = 0; while (num >= 0) { @@ -284,19 +383,28 @@ public class TestRaidDfs extends TestCas if (num < 0) { break; } + numRead += num; newcrc.update(b, 0, num); } stm.close(); + + if (numRead != length) { + LOG.info("Number of bytes read " + numRead + + " does not match file size " + length); + return false; + } + LOG.info(" Newcrc " + newcrc.getValue() + " old crc " + crc); if (newcrc.getValue() != crc) { - fail("CRC mismatch of files " + name1 + " with file " + name2); + LOG.info("CRC mismatch of file " + name + ": " + newcrc + " vs. " + crc); } + return true; } /* * The Data directories for a datanode */ - static private File[] getDataNodeDirs(int i) throws IOException { + private static File[] getDataNodeDirs(int i) throws IOException { File base_dir = new File(System.getProperty("test.build.data"), "dfs/"); File data_dir = new File(base_dir, "data"); File dir1 = new File(data_dir, "data"+(2*i+1)); @@ -353,10 +461,32 @@ public class TestRaidDfs extends TestCas (numCorrupted + numDeleted) > 0); } - // - // Corrupt specified block of file - // - void corruptBlock(Path file, Block blockNum) throws IOException { - corruptBlock(file, blockNum, NUM_DATANODES, true); + public static void corruptBlock(Path file, Block blockNum, + int numDataNodes, long offset) throws IOException { + long id = blockNum.getBlockId(); + + // Now deliberately remove/truncate data blocks from the block. + // + for (int i = 0; i < numDataNodes; i++) { + File[] dirs = getDataNodeDirs(i); + + for (int j = 0; j < dirs.length; j++) { + File[] blocks = dirs[j].listFiles(); + assertTrue("Blocks do not exist in data-dir", (blocks != null) && (blocks.length >= 0)); + for (int idx = 0; idx < blocks.length; idx++) { + if (blocks[idx].getName().startsWith("blk_" + id) && + !blocks[idx].getName().endsWith(".meta")) { + // Corrupt + File f = blocks[idx]; + RandomAccessFile raf = new RandomAccessFile(f, "rw"); + raf.seek(offset); + int data = raf.readInt(); + raf.seek(offset); + raf.writeInt(data+1); + LOG.info("Corrupted block " + blocks[idx]); + } + } + } + } } } Added: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java?rev=1021873&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java (added) +++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java Tue Oct 12 18:23:36 2010 @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.raid; + +import java.io.File; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +import junit.framework.TestCase; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; + +public class TestDirectoryTraversal extends TestCase { + final static Log LOG = LogFactory.getLog( + "org.apache.hadoop.raid.TestDirectoryTraversal"); + final static String TEST_DIR = new File(System.getProperty("test.build.data", + "build/contrib/raid/test/data")).getAbsolutePath(); + + MiniDFSCluster dfs = null; + FileSystem fs = null; + Configuration conf = null; + + /** + * Test basic enumeration. + */ + public void testEnumeration() throws IOException { + mySetup(); + + try { + Path topDir = new Path(TEST_DIR + "/testenumeration"); + + createTestTree(topDir); + + LOG.info("Enumerating files"); + List startPaths = new LinkedList(); + startPaths.add(fs.getFileStatus(topDir)); + DirectoryTraversal dt = new DirectoryTraversal(fs, startPaths); + + List selected = new LinkedList(); + while (true) { + FileStatus f = dt.getNextFile(); + if (f == null) break; + assertEquals(false, f.isDir()); + LOG.info(f.getPath()); + selected.add(f); + } + assertEquals(5, selected.size()); + + LOG.info("Enumerating directories"); + startPaths.clear(); + startPaths.add(fs.getFileStatus(topDir)); + dt = new DirectoryTraversal(fs, startPaths); + selected.clear(); + while (true) { + FileStatus dir = dt.getNextDirectory(); + if (dir == null) break; + assertEquals(true, dir.isDir()); + LOG.info(dir.getPath()); + selected.add(dir); + } + assertEquals(4, selected.size()); + } finally { + myTearDown(); + } + } + + public void testSuspension() throws IOException { + mySetup(); + + try { + Path topDir = new Path(TEST_DIR + "/testenumeration"); + + createTestTree(topDir); + + String top = topDir.toString(); + List startPaths = new LinkedList(); + startPaths.add(fs.getFileStatus(new Path(top + "/a"))); + startPaths.add(fs.getFileStatus(new Path(top + "/b"))); + DirectoryTraversal dt = new DirectoryTraversal(fs, startPaths); + + int limit = 2; + short targetRepl = 1; + Path raid = new Path("/raid"); + List selected = dt.selectFilesToRaid(conf, targetRepl, raid, + 0, limit); + for (FileStatus f: selected) { + LOG.info(f.getPath()); + } + assertEquals(limit, selected.size()); + + selected = dt.selectFilesToRaid(conf, targetRepl, raid, 0, limit); + for (FileStatus f: selected) { + LOG.info(f.getPath()); + } + assertEquals(limit, selected.size()); + } finally { + myTearDown(); + } + } + + /** + * Creates a test directory tree. + * top + * / | \ + * / | f5 + * a b___ + * / \ |\ \ + * f1 f2 f3f4 c + */ + private void createTestTree(Path topDir) throws IOException { + String top = topDir.toString(); + fs.delete(topDir, true); + + fs.mkdirs(topDir); + fs.create(new Path(top + "/f5")).close(); + + fs.mkdirs(new Path(top + "/a")); + createTestFile(new Path(top + "/a/f1")); + createTestFile(new Path(top + "/a/f2")); + + fs.mkdirs(new Path(top + "/b")); + fs.mkdirs(new Path(top + "/b/c")); + createTestFile(new Path(top + "/b/f3")); + createTestFile(new Path(top + "/b/f4")); + } + + private void createTestFile(Path file) throws IOException { + long blockSize = 8192; + byte[] bytes = new byte[(int)blockSize]; + FSDataOutputStream stm = fs.create(file, false, 4096, (short)1, blockSize); + stm.write(bytes); + stm.write(bytes); + stm.write(bytes); + stm.close(); + FileStatus stat = fs.getFileStatus(file); + assertEquals(blockSize, stat.getBlockSize()); + } + + private void mySetup() throws IOException { + conf = new Configuration(); + dfs = new MiniDFSCluster(conf, 6, true, null); + dfs.waitActive(); + fs = dfs.getFileSystem(); + } + + private void myTearDown() { + if (dfs != null) { dfs.shutdown(); } + } +} Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java?rev=1021873&r1=1021872&r2=1021873&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java (original) +++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java Tue Oct 12 18:23:36 2010 @@ -82,6 +82,7 @@ public class TestRaidHar extends TestCas conf.setBoolean("fs.raidnode.local", local); conf.set("raid.server.address", "localhost:0"); + conf.set(RaidNode.RAID_LOCATION_KEY, "/destraid"); // create a dfs and map-reduce cluster final int taskTrackers = 4; @@ -101,12 +102,12 @@ public class TestRaidHar extends TestCas /** * create raid.xml file for RaidNode */ - private void mySetup(String srcPath, long targetReplication, - long metaReplication, long stripeLength ) throws Exception { + private void mySetup(long targetReplication, + long metaReplication, long stripeLength) throws Exception { FileWriter fileWriter = new FileWriter(CONFIG_FILE); fileWriter.write("\n"); String str = " " + - " " + + " " + " " + " /destraid " + " " + @@ -162,7 +163,6 @@ public class TestRaidHar extends TestCas public void testRaidHar() throws Exception { LOG.info("Test testRaidHar started."); - String srcPaths [] = { "/user/test/raidtest", "/user/test/raid*" }; long blockSizes [] = {1024L}; long stripeLengths [] = {5}; long targetReplication = 1; @@ -172,13 +172,11 @@ public class TestRaidHar extends TestCas createClusters(true); try { - for (String srcPath : srcPaths) { - for (long blockSize : blockSizes) { - for (long stripeLength : stripeLengths) { - doTestHar(iter, srcPath, targetReplication, metaReplication, - stripeLength, blockSize, numBlock); - iter++; - } + for (long blockSize : blockSizes) { + for (long stripeLength : stripeLengths) { + doTestHar(iter, targetReplication, metaReplication, + stripeLength, blockSize, numBlock); + iter++; } } } finally { @@ -191,14 +189,14 @@ public class TestRaidHar extends TestCas * Create parity file, delete original file and then validate that * parity file is automatically deleted. */ - private void doTestHar(int iter, String srcPath, long targetReplication, + private void doTestHar(int iter, long targetReplication, long metaReplication, long stripeLength, long blockSize, int numBlock) throws Exception { LOG.info("doTestHar started---------------------------:" + " iter " + iter + " blockSize=" + blockSize + " stripeLength=" + stripeLength); - mySetup(srcPath, targetReplication, metaReplication, stripeLength); - RaidShell shell = null; + mySetup(targetReplication, metaReplication, stripeLength); Path dir = new Path("/user/test/raidtest/subdir/"); + Path file1 = new Path(dir + "/file" + iter); RaidNode cnode = null; try { Path destPath = new Path("/destraid/user/test/raidtest/subdir"); @@ -211,21 +209,9 @@ public class TestRaidHar extends TestCas LOG.info("doTestHar created test files for iteration " + iter); // create an instance of the RaidNode - cnode = RaidNode.createRaidNode(null, conf); - int times = 10; - - while (times-- > 0) { - try { - shell = new RaidShell(conf, cnode.getListenerAddress()); - } catch (Exception e) { - LOG.info("doTestHar unable to connect to " + - cnode.getListenerAddress() + " retrying...."); - Thread.sleep(1000); - continue; - } - break; - } - LOG.info("doTestHar created RaidShell."); + Configuration localConf = new Configuration(conf); + localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid"); + cnode = RaidNode.createRaidNode(null, localConf); FileStatus[] listPaths = null; int maxFilesFound = 0; @@ -234,6 +220,7 @@ public class TestRaidHar extends TestCas try { listPaths = fileSys.listStatus(destPath); int count = 0; + Path harPath = null; int filesFound = 0; if (listPaths != null) { for (FileStatus s : listPaths) { @@ -250,6 +237,7 @@ public class TestRaidHar extends TestCas // files since some parity files might get deleted by the // purge thread. assertEquals(10, maxFilesFound); + harPath = s.getPath(); count++; } } @@ -260,11 +248,12 @@ public class TestRaidHar extends TestCas } catch (FileNotFoundException e) { //ignore } - LOG.info("doTestHar waiting for files to be raided and parity files to be har'ed and deleted. Found " + + LOG.info("doTestHar waiting for files to be raided and parity files to be har'ed and deleted. Found " + (listPaths == null ? "none" : listPaths.length)); Thread.sleep(1000); // keep waiting + } - + fileSys.delete(dir, true); // wait till raid file is deleted int count = 1; @@ -291,7 +280,6 @@ public class TestRaidHar extends TestCas StringUtils.stringifyException(e)); throw e; } finally { - shell.close(); if (cnode != null) { cnode.stop(); cnode.join(); } } LOG.info("doTestHar completed:" + " blockSize=" + blockSize +