Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 31290 invoked from network); 11 Jul 2007 18:38:28 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 11 Jul 2007 18:38:28 -0000 Received: (qmail 97695 invoked by uid 500); 11 Jul 2007 18:38:31 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 97664 invoked by uid 500); 11 Jul 2007 18:38:30 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 97648 invoked by uid 99); 11 Jul 2007 18:38:30 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Jul 2007 11:38:30 -0700 X-ASF-Spam-Status: No, hits=-99.5 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Jul 2007 11:38:27 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id D719F1A981A; Wed, 11 Jul 2007 11:38:06 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r555360 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/tools/Logalyzer.java src/java/org/apache/hadoop/util/CopyFiles.java src/test/org/apache/hadoop/fs/TestCopyFiles.java Date: Wed, 11 Jul 2007 18:38:05 -0000 To: hadoop-commits@lucene.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070711183806.D719F1A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: omalley Date: Wed Jul 11 11:38:00 2007 New Revision: 555360 URL: http://svn.apache.org/viewvc?view=rev&rev=555360 Log: HADOOP-1533. Add persistent logging of errors to distcp. Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/tools/Logalyzer.java lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=555360&r1=555359&r2=555360 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Wed Jul 11 11:38:00 2007 @@ -286,6 +286,9 @@ 88. HADOOP-1554. Log killed tasks to the job history and display them on the web/ui. (Devaraj Das via omalley) + 89. HADOOP-1533. Add persistent error logging for distcp. The logs are stored + into a specified hdfs directory. (Senthil Subramanian via omalley) + Release 0.13.0 - 2007-06-08 1. HADOOP-1047. Fix TestReplication to succeed more reliably. Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/tools/Logalyzer.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/tools/Logalyzer.java?view=diff&rev=555360&r1=555359&r2=555360 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/tools/Logalyzer.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/tools/Logalyzer.java Wed Jul 11 11:38:00 2007 @@ -177,7 +177,7 @@ { String destURL = "hdfs://" + fsConfig.get("fs.default.name", "local") + archiveDirectory; - CopyFiles.copy(fsConfig, logListURI, destURL, true, false); + CopyFiles.copy(fsConfig, logListURI, destURL, null, true, false); } /** Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java?view=diff&rev=555360&r1=555359&r2=555360 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java Wed Jul 11 11:38:00 2007 @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; @@ -54,7 +55,6 @@ import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.SequenceFileInputFormat; -import org.apache.hadoop.mapred.SequenceFileOutputFormat; /** * A Map-reduce program to recursively copy directories between @@ -65,7 +65,7 @@ private static final String S3 = "s3"; private static final String usage = "distcp "+ - "[-i] | -f "; + "[-i] | -f [-log ]"; private static final long MIN_BYTES_PER_MAP = 1L << 28; private static final int MAX_NUM_MAPS = 10000; @@ -93,11 +93,13 @@ * @param jobConf : The handle to the jobConf object to be initialized. * @param srcPaths : The source paths. * @param destPath : The destination path. + * @param logPath : The log path. * @param ignoreReadFailures : Ignore read failures? * @throws IOException */ public abstract void setup(Configuration conf, JobConf jobConf, - String[] srcPaths, String destPath, boolean ignoreReadFailures) + String[] srcPaths, String destPath, + Path logPath, boolean ignoreReadFailures) throws IOException; /** @@ -198,7 +200,8 @@ // open source file Path srcFile = new Path(srcPath, src); FSDataInputStream in = srcFileSys.open(srcFile); - long totalBytes = srcFileSys.getLength(srcFile); + FileStatus srcFileStatus = srcFileSys.getFileStatus(srcFile); + long totalBytes = srcFileStatus.getLen(); // create directories to hold destination file and create destFile Path destFile = new Path(destPath, src); @@ -244,11 +247,12 @@ * @param jobConf : The handle to the jobConf object to be initialized. * @param srcPaths : The source URIs. * @param destPath : The destination URI. + * @param logPath : The log Path. * @param ignoreReadFailures : Ignore read failures? */ public void setup(Configuration conf, JobConf jobConf, String[] srcPaths, String destPath, - boolean ignoreReadFailures) + Path logPath, boolean ignoreReadFailures) throws IOException { URI srcURI = toURI(srcPaths[0]); @@ -284,20 +288,15 @@ jobConf.setSpeculativeExecution(false); jobConf.setInputFormat(SequenceFileInputFormat.class); - jobConf.setOutputKeyClass(Text.class); - jobConf.setOutputValueClass(Text.class); - jobConf.setOutputFormat(SequenceFileOutputFormat.class); - jobConf.setMapperClass(FSCopyFilesMapper.class); - jobConf.setNumReduceTasks(1); + jobConf.setNumReduceTasks(0); jobConf.setBoolean(readFailuresAttribute, ignoreReadFailures); Random r = new Random(); Path jobDirectory = new Path(jobConf.getSystemDir(), "distcp_" + Integer.toString(r.nextInt(Integer.MAX_VALUE), 36)); Path inDir = new Path(jobDirectory, "in"); - Path fakeOutDir = new Path(jobDirectory, "out"); FileSystem fileSys = FileSystem.get(jobConf); if (!fileSys.mkdirs(inDir)) { throw new IOException("Mkdirs failed to create " + @@ -306,7 +305,7 @@ jobConf.set("distcp.job.dir", jobDirectory.toString()); jobConf.setInputPath(inDir); - jobConf.setOutputPath(fakeOutDir); + jobConf.setOutputPath(logPath); // create new sequence-files for holding paths ArrayList pathList = new ArrayList(); @@ -317,7 +316,7 @@ while(!pathList.isEmpty()) { Path top = pathList.remove(0); if (srcfs.isFile(top)) { - totalBytes += srcfs.getLength(top); + totalBytes += srcfs.getFileStatus(top).getLen(); top = makeRelative(rootPath, top); finalPathList.add(top.toString()); } else { @@ -406,6 +405,8 @@ try { copy(src, reporter); } catch (IOException except) { + out.collect(null, new Text("Failed to copy " + src + " : " + + StringUtils.stringifyException(except))); if (ignoreReadFailures) { reporter.setStatus("Failed to copy " + src + " : " + StringUtils.stringifyException(except)); @@ -441,11 +442,12 @@ * @param jobConf : The handle to the jobConf object to be initialized. * @param srcPaths : The source URI. * @param destPath : The destination URI. + * @param logPath : The log Path. * @param ignoreReadFailures : Ignore read failures? */ public void setup(Configuration conf, JobConf jobConf, String[] srcPaths, String destPath, - boolean ignoreReadFailures) + Path logPath, boolean ignoreReadFailures) throws IOException { //Destination @@ -453,16 +455,12 @@ jobConf.set("copy.dest.fs", destURI.toString()); destPath = destURI.getPath(); jobConf.set("copy.dest.path", destPath); - + //Setup the MR-job configuration jobConf.setSpeculativeExecution(false); jobConf.setInputFormat(SequenceFileInputFormat.class); - jobConf.setOutputKeyClass(Text.class); - jobConf.setOutputValueClass(Text.class); - jobConf.setOutputFormat(SequenceFileOutputFormat.class); - jobConf.setMapperClass(HTTPCopyFilesMapper.class); JobClient client = new JobClient(jobConf); @@ -481,8 +479,7 @@ jobConf.setInputPath(jobInputDir); jobConf.set("distcp.job.dir", jobDirectory.toString()); - Path jobOutputDir = new Path(jobDirectory, "out"); - jobConf.setOutputPath(jobOutputDir); + jobConf.setOutputPath(logPath); for(int i=0; i < srcPaths.length; ++i) { Path ipFile = new Path(jobInputDir, "part" + i); @@ -514,8 +511,7 @@ try { //Destination - destFileSys = - FileSystem.getNamed(job.get("copy.dest.fs", "local"), job); + destFileSys = FileSystem.get(URI.create(job.get("copy.dest.fs", "file:///")), job); destPath = new Path(job.get("copy.dest.path", "/")); if (!destFileSys.exists(destPath)) { return; @@ -577,7 +573,7 @@ /* handle exceptions */ private void handleException( Reporter reporter, Text key, Throwable e ) throws IOException { - String errMsg = "Failed to copy from: " + (Text)key; + String errMsg = "Failed to copy from: " + key; reporter.setStatus(errMsg); if ( !ignoreReadFailures ) { throw new IOException(errMsg); @@ -700,10 +696,12 @@ * @param conf Configuration * @param srcPath Source path URL * @param destPath Destination path URL + * @param logPath the log path * @param srcAsList List of source URLs to copy. * @param ignoreReadFailures True if we are to ignore read failures. */ - public static void copy(Configuration conf, String srcPath, String destPath, + public static void copy(Configuration conf, String srcPath, + String destPath, Path logPath, boolean srcAsList, boolean ignoreReadFailures) throws IOException { @@ -715,6 +713,12 @@ URI srcURI = toURI(srcPath); toURI(destPath); + // default logPath + if (logPath == null) { + logPath = new Path(toURI(destPath).getPath() + "/_distcp_logs_" + + System.currentTimeMillis()); + } + //Create the task-specific mapper CopyFilesMapper mapper = null; String[] srcPaths = null; @@ -728,7 +732,7 @@ String[] dfsUrls = parseInputFile(HDFS, srcPaths); if (dfsUrls != null) { for(int i=0; i < dfsUrls.length; ++i) { - copy(conf, dfsUrls[i], destPath, false, ignoreReadFailures); + copy(conf, dfsUrls[i], destPath, logPath, false, ignoreReadFailures); } } @@ -736,7 +740,7 @@ String[] localUrls = parseInputFile("file", srcPaths); if (localUrls != null) { for(int i=0; i < localUrls.length; ++i) { - copy(conf, localUrls[i], destPath, false, ignoreReadFailures); + copy(conf, localUrls[i], destPath, logPath, false, ignoreReadFailures); } } @@ -766,7 +770,7 @@ } //Initialize the mapper - mapper.setup(conf, jobConf, srcPaths, destPath, ignoreReadFailures); + mapper.setup(conf, jobConf, srcPaths, destPath, logPath, ignoreReadFailures); //We are good to go! try { @@ -787,6 +791,7 @@ public int run(String[] args) throws Exception { String srcPath = null; String destPath = null; + Path logPath = null; boolean ignoreReadFailures = false; boolean srcAsList = false; @@ -799,6 +804,8 @@ srcPath = args[idx]; } else if (destPath == null) { destPath = args[idx]; + } else if ("-log".equals(args[idx])) { + logPath = new Path(args[++idx]); } else { System.out.println(usage); return -1; @@ -810,9 +817,41 @@ System.out.println(usage); return -1; } - + + // default logPath + if (logPath == null) { + logPath = new Path(toURI(destPath).getPath() + "/_distcp_logs_" + + System.currentTimeMillis()); + System.out.println("Using default logPath: " + logPath); + } + + // verify if srcPath, destPath are valid and logPath is valid and doesnot exist + try { + URI srcURI = toURI(srcPath); + FileSystem srcfs = FileSystem.get(srcURI, conf); + if (!srcfs.exists(new Path(srcPath))) { + System.out.println(srcPath + " does not exist."); + return -1; + } + + URI destURI = toURI(destPath); + FileSystem destfs = FileSystem.get(destURI, conf); + if (destfs.exists(new Path(destPath))) { + System.out.println("WARNING: " + destPath + " already exists."); + } + + FileSystem logfs = FileSystem.get(logPath.toUri(), conf); + if (logfs.exists(logPath)) { + System.out.println("ERROR: " + logPath + " already exists."); + return -1; + } + } catch (Exception e) { + System.err.println("Copy failed: " + StringUtils.stringifyException(e)); + return -1; + } + try { - copy(conf, srcPath, destPath, srcAsList, ignoreReadFailures); + copy(conf, srcPath, destPath, logPath, srcAsList, ignoreReadFailures); } catch (Exception e) { System.err.println("Copy failed: "+StringUtils.stringifyException(e)); return -1; Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java?view=diff&rev=555360&r1=555359&r2=555360 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java Wed Jul 11 11:38:00 2007 @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Random; +import java.net.URI; import junit.framework.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.dfs.MiniDFSCluster; @@ -182,11 +183,17 @@ if (!"local".equals(namenode)) { MyFile[] files = createFiles(namenode, "/srcdat"); new CopyFiles().doMain(conf, new String[] {"hdfs://"+namenode+"/srcdat", - "hdfs://"+namenode+"/destdat"}); + "hdfs://"+namenode+"/destdat", + "-log", + "hdfs://"+namenode+"/logs"}); assertTrue("Source and destination directories do not match.", checkFiles(namenode, "/destdat", files)); + FileSystem fs = FileSystem.get(URI.create("hdfs://"+namenode+"/logs"), conf); + assertTrue("Log directory doesnot exist.", + fs.exists(new Path("hdfs://"+namenode+"/logs"))); deldir(namenode, "/destdat"); deldir(namenode, "/srcdat"); + deldir(namenode, "/logs"); } } finally { if (cluster != null) { cluster.shutdown(); } @@ -204,10 +211,16 @@ if (!"local".equals(namenode)) { MyFile[] files = createFiles("local", TEST_ROOT_DIR+"/srcdat"); new CopyFiles().doMain(conf, new String[] {"file://"+TEST_ROOT_DIR+"/srcdat", - "hdfs://"+namenode+"/destdat"}); + "hdfs://"+namenode+"/destdat", + "-log", + "hdfs://"+namenode+"/logs"}); assertTrue("Source and destination directories do not match.", checkFiles(namenode, "/destdat", files)); + FileSystem fs = FileSystem.get(URI.create("hdfs://"+namenode+"/logs"), conf); + assertTrue("Log directory doesnot exist.", + fs.exists(new Path("hdfs://"+namenode+"/logs"))); deldir(namenode, "/destdat"); + deldir(namenode, "/logs"); deldir("local", TEST_ROOT_DIR+"/srcdat"); } } finally { @@ -226,10 +239,16 @@ if (!"local".equals(namenode)) { MyFile[] files = createFiles(namenode, "/srcdat"); new CopyFiles().doMain(conf, new String[] {"hdfs://"+namenode+"/srcdat", - "file://"+TEST_ROOT_DIR+"/destdat"}); + "file://"+TEST_ROOT_DIR+"/destdat", + "-log", + TEST_ROOT_DIR+"/logs"}); assertTrue("Source and destination directories do not match.", checkFiles("local", TEST_ROOT_DIR+"/destdat", files)); + FileSystem fs = FileSystem.get(URI.create("hdfs://"+namenode+"/logs"), conf); + assertTrue("Log directory doesnot exist.", + fs.exists(new Path(TEST_ROOT_DIR+"/logs"))); deldir("local", TEST_ROOT_DIR+"/destdat"); + deldir("local", TEST_ROOT_DIR+"/logs"); deldir(namenode, "/srcdat"); } } finally {