Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 76334 invoked from network); 20 Jun 2008 22:34:56 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 20 Jun 2008 22:34:56 -0000 Received: (qmail 58150 invoked by uid 500); 20 Jun 2008 22:34:58 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 58119 invoked by uid 500); 20 Jun 2008 22:34:58 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 58110 invoked by uid 99); 20 Jun 2008 22:34:58 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Jun 2008 15:34:58 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Jun 2008 22:34:16 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 869482388A0A; Fri, 20 Jun 2008 15:34:05 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r670086 - in /hadoop/core/trunk: ./ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ Date: Fri, 20 Jun 2008 22:34:05 -0000 To: core-commits@hadoop.apache.org From: acmurthy@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080620223405.869482388A0A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: acmurthy Date: Fri Jun 20 15:34:04 2008 New Revision: 670086 URL: http://svn.apache.org/viewvc?rev=670086&view=rev Log: HADOOP-3598. Ensure that temporary task-output directories are not created if they are not necessary e.g. for Maps with no side-effect files. Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapFileOutputFormat.java hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileOutputFormat.java hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TextOutputFormat.java hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=670086&r1=670085&r2=670086&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Fri Jun 20 15:34:04 2008 @@ -102,6 +102,10 @@ HADOOP-3512. Separate out the tools into a tools jar. (omalley) + HADOOP-3598. Ensure that temporary task-output directories are not created + if they are not necessary e.g. for Maps with no side-effect files. + (acmurthy) + NEW FEATURES HADOOP-3074. Provides a UrlStreamHandler for DFS and other FS, Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java?rev=670086&r1=670085&r2=670086&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java Fri Jun 20 15:34:04 2008 @@ -195,5 +195,40 @@ return name == null ? null: new Path(name); } + /** + * Helper function to create the task's temporary output directory and + * return the path to the task's output file. + * + * @param conf job-configuration + * @param name temporary task-output filename + * @return path to the task's temporary output file + * @throws IOException + */ + protected static Path getTaskOutputPath(JobConf conf, String name) + throws IOException { + // ${mapred.job.dir} + Path outputPath = getOutputPath(conf); + if (outputPath == null) { + throw new IOException("Undefined job output-path"); + } + + // ${mapred.out.dir}/_temporary + Path jobTmpDir = new Path(outputPath, MRConstants.TEMP_DIR_NAME); + FileSystem fs = jobTmpDir.getFileSystem(conf); + if (!fs.exists(jobTmpDir)) { + throw new IOException("The temporary job-output directory " + + jobTmpDir.toString() + " doesn't exist!"); + } + + // ${mapred.out.dir}/_temporary/_${taskid} + Path taskTmpDir = getWorkOutputPath(conf); + if (!fs.mkdirs(taskTmpDir)) { + throw new IOException("Mkdirs failed to create " + + taskTmpDir.toString()); + } + + // ${mapred.out.dir}/_temporary/_${taskid}/${name} + return new Path(taskTmpDir, name); + } } Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapFileOutputFormat.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapFileOutputFormat.java?rev=670086&r1=670085&r2=670086&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapFileOutputFormat.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapFileOutputFormat.java Fri Jun 20 15:34:04 2008 @@ -42,14 +42,10 @@ public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { - - Path outputPath = getWorkOutputPath(job); - FileSystem fs = outputPath.getFileSystem(job); - if (!fs.exists(outputPath)) { - throw new IOException("Output directory doesnt exist"); - } - Path file = new Path(outputPath, name); + // get the path of the temporary output file + Path file = FileOutputFormat.getTaskOutputPath(job, name); + FileSystem fs = file.getFileSystem(job); CompressionCodec codec = null; CompressionType compressionType = CompressionType.NONE; if (getCompressOutput(job)) { Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java?rev=670086&r1=670085&r2=670086&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java Fri Jun 20 15:34:04 2008 @@ -130,13 +130,10 @@ getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { - - Path outputPath = getWorkOutputPath(job); - FileSystem fs = outputPath.getFileSystem(job); - if (!fs.exists(outputPath)) { - throw new IOException("Output directory doesnt exist"); - } - Path file = new Path(outputPath, name); + // get the path of the temporary output file + Path file = FileOutputFormat.getTaskOutputPath(job, name); + + FileSystem fs = file.getFileSystem(job); CompressionCodec codec = null; CompressionType compressionType = CompressionType.NONE; if (getCompressOutput(job)) { Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileOutputFormat.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileOutputFormat.java?rev=670086&r1=670085&r2=670086&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileOutputFormat.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileOutputFormat.java Fri Jun 20 15:34:04 2008 @@ -39,13 +39,10 @@ FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { - - Path outputPath = getWorkOutputPath(job); - FileSystem fs = outputPath.getFileSystem(job); - if (!fs.exists(outputPath)) { - throw new IOException("Output directory doesnt exist"); - } - Path file = new Path(outputPath, name); + // get the path of the temporary output file + Path file = FileOutputFormat.getTaskOutputPath(job, name); + + FileSystem fs = file.getFileSystem(job); CompressionCodec codec = null; CompressionType compressionType = CompressionType.NONE; if (getCompressOutput(job)) { Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java?rev=670086&r1=670085&r2=670086&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java Fri Jun 20 15:34:04 2008 @@ -493,6 +493,9 @@ > 0) { shouldBePromoted = true; } + } else { + LOG.info(getTaskID() + ": No outputs to promote from " + + taskOutputPath); } } } catch (IOException ioe) { Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=670086&r1=670085&r2=670086&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Jun 20 15:34:04 2008 @@ -1458,22 +1458,6 @@ localJobConf.set("mapred.task.id", task.getTaskID().toString()); keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles(); - // create _taskid directory in output path temporary directory. - Path outputPath = FileOutputFormat.getOutputPath(localJobConf); - if (outputPath != null) { - Path jobTmpDir = new Path(outputPath, MRConstants.TEMP_DIR_NAME); - FileSystem fs = jobTmpDir.getFileSystem(localJobConf); - if (fs.exists(jobTmpDir)) { - Path taskTmpDir = new Path(jobTmpDir, "_" + task.getTaskID()); - if (!fs.mkdirs(taskTmpDir)) { - throw new IOException("Mkdirs failed to create " - + taskTmpDir.toString()); - } - } else { - throw new IOException("The directory " + jobTmpDir.toString() - + " doesnt exist "); - } - } task.localizeConfiguration(localJobConf); List staticResolutions = NetUtils.getAllStaticResolutions(); Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TextOutputFormat.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TextOutputFormat.java?rev=670086&r1=670085&r2=670086&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TextOutputFormat.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TextOutputFormat.java Fri Jun 20 15:34:04 2008 @@ -108,16 +108,13 @@ String name, Progressable progress) throws IOException { - - String keyValueSeparator = job.get("mapred.textoutputformat.separator", "\t"); - Path dir = getWorkOutputPath(job); - FileSystem fs = dir.getFileSystem(job); - if (!fs.exists(dir)) { - throw new IOException("Output directory doesnt exist"); - } boolean isCompressed = getCompressOutput(job); + String keyValueSeparator = job.get("mapred.textoutputformat.separator", + "\t"); if (!isCompressed) { - FSDataOutputStream fileOut = fs.create(new Path(dir, name), progress); + Path file = FileOutputFormat.getTaskOutputPath(job, name); + FileSystem fs = file.getFileSystem(job); + FSDataOutputStream fileOut = fs.create(file, progress); return new LineRecordWriter(fileOut, keyValueSeparator); } else { Class codecClass = @@ -126,8 +123,11 @@ CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, job); // build the filename including the extension - Path filename = new Path(dir, name + codec.getDefaultExtension()); - FSDataOutputStream fileOut = fs.create(filename, progress); + Path file = + FileOutputFormat.getTaskOutputPath(job, + name + codec.getDefaultExtension()); + FileSystem fs = file.getFileSystem(job); + FSDataOutputStream fileOut = fs.create(file, progress); return new LineRecordWriter(new DataOutputStream (codec.createOutputStream(fileOut)), keyValueSeparator); Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java?rev=670086&r1=670085&r2=670086&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java Fri Jun 20 15:34:04 2008 @@ -39,7 +39,10 @@ } private static Path workDir = - new Path(new Path(System.getProperty("test.build.data", "."), "data"), + new Path(new Path( + new Path(System.getProperty("test.build.data", "."), + "data"), + MRConstants.TEMP_DIR_NAME), "TestMultipleTextOutputFormat"); private static void writeData(RecordWriter rw) throws IOException { @@ -81,6 +84,7 @@ public void testFormat() throws Exception { JobConf job = new JobConf(); + FileOutputFormat.setOutputPath(job, workDir.getParent().getParent()); FileOutputFormat.setWorkOutputPath(job, workDir); FileSystem fs = workDir.getFileSystem(job); if (!fs.mkdirs(workDir)) { Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java?rev=670086&r1=670085&r2=670086&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java Fri Jun 20 15:34:04 2008 @@ -38,7 +38,11 @@ public void testBinary() throws IOException { JobConf job = new JobConf(); FileSystem fs = FileSystem.getLocal(job); - Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred"); + + Path dir = + new Path(new Path(new Path(System.getProperty("test.build.data",".")), + MRConstants.TEMP_DIR_NAME), + "mapred"); Path file = new Path(dir, "testbinary.seq"); Random r = new Random(); long seed = r.nextLong(); @@ -49,6 +53,7 @@ fail("Failed to create output directory"); } + FileOutputFormat.setOutputPath(job, dir.getParent().getParent()); FileOutputFormat.setWorkOutputPath(job, dir); SequenceFileAsBinaryOutputFormat.setSequenceFileOutputKeyClass(job, Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java?rev=670086&r1=670085&r2=670086&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java Fri Jun 20 15:34:04 2008 @@ -36,13 +36,17 @@ } } - private static Path workDir = - new Path(new Path(System.getProperty("test.build.data", "."), "data"), + private static Path workDir = + new Path(new Path( + new Path(System.getProperty("test.build.data", "."), + "data"), + MRConstants.TEMP_DIR_NAME), "TestTextOutputFormat"); @SuppressWarnings("unchecked") public void testFormat() throws Exception { JobConf job = new JobConf(); + FileOutputFormat.setOutputPath(job, workDir.getParent().getParent()); FileOutputFormat.setWorkOutputPath(job, workDir); FileSystem fs = workDir.getFileSystem(job); if (!fs.mkdirs(workDir)) { @@ -94,6 +98,7 @@ JobConf job = new JobConf(); String separator = "\u0001"; job.set("mapred.textoutputformat.separator", separator); + FileOutputFormat.setOutputPath(job, workDir.getParent().getParent()); FileOutputFormat.setWorkOutputPath(job, workDir); FileSystem fs = workDir.getFileSystem(job); if (!fs.mkdirs(workDir)) {