Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 630A692BD for ; Tue, 28 Feb 2012 17:43:34 +0000 (UTC) Received: (qmail 42615 invoked by uid 500); 28 Feb 2012 17:43:34 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 42553 invoked by uid 500); 28 Feb 2012 17:43:34 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 42535 invoked by uid 99); 28 Feb 2012 17:43:34 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Feb 2012 17:43:34 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Feb 2012 17:43:30 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 629F523888E4 for ; Tue, 28 Feb 2012 17:43:09 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1294743 - in /hadoop/common/trunk/hadoop-tools/hadoop-streaming/src: main/java/org/apache/hadoop/streaming/PipeMapRed.java test/java/org/apache/hadoop/streaming/OutputOnlyApp.java test/java/org/apache/hadoop/streaming/TestUnconsumedInput.java Date: Tue, 28 Feb 2012 17:43:09 -0000 To: common-commits@hadoop.apache.org From: bobby@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120228174309.629F523888E4@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: bobby Date: Tue Feb 28 17:43:08 2012 New Revision: 1294743 URL: http://svn.apache.org/viewvc?rev=1294743&view=rev Log: MAPREDUCE-3790 Broken pipe on streaming job can lead to truncated output for a successful job (Jason Lowe via bobby) Added: hadoop/common/trunk/hadoop-tools/hadoop-streaming/src/test/java/org/apache/hadoop/streaming/OutputOnlyApp.java hadoop/common/trunk/hadoop-tools/hadoop-streaming/src/test/java/org/apache/hadoop/streaming/TestUnconsumedInput.java Modified: hadoop/common/trunk/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/PipeMapRed.java Modified: hadoop/common/trunk/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/PipeMapRed.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=1294743&r1=1294742&r2=1294743&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/PipeMapRed.java (original) +++ hadoop/common/trunk/hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/PipeMapRed.java Tue Feb 28 17:43:08 2012 @@ -521,11 +521,15 @@ public abstract class PipeMapRed { LOG.info("mapRedFinished"); return; } - try { - if (clientOut_ != null) { + if (clientOut_ != null) { + try { clientOut_.flush(); clientOut_.close(); + } catch (IOException io) { + LOG.warn(io); } + } + try { waitOutputThreads(); } catch (IOException io) { LOG.warn(io); Added: hadoop/common/trunk/hadoop-tools/hadoop-streaming/src/test/java/org/apache/hadoop/streaming/OutputOnlyApp.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-streaming/src/test/java/org/apache/hadoop/streaming/OutputOnlyApp.java?rev=1294743&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-tools/hadoop-streaming/src/test/java/org/apache/hadoop/streaming/OutputOnlyApp.java (added) +++ hadoop/common/trunk/hadoop-tools/hadoop-streaming/src/test/java/org/apache/hadoop/streaming/OutputOnlyApp.java Tue Feb 28 17:43:08 2012 @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.streaming; + +import java.io.IOException; + +/** + * An application that outputs a specified number of lines + * without consuming any input. + */ +public class OutputOnlyApp { + public static void main(String[] args) throws IOException { + if (args.length < 1) { + System.err.println("Usage: OutputOnlyApp NUMRECORDS"); + return; + } + int numRecords = Integer.parseInt(args[0]); + while (numRecords-- > 0) { + System.out.println("key\tvalue"); + } + } +} Added: hadoop/common/trunk/hadoop-tools/hadoop-streaming/src/test/java/org/apache/hadoop/streaming/TestUnconsumedInput.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-streaming/src/test/java/org/apache/hadoop/streaming/TestUnconsumedInput.java?rev=1294743&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-tools/hadoop-streaming/src/test/java/org/apache/hadoop/streaming/TestUnconsumedInput.java (added) +++ hadoop/common/trunk/hadoop-tools/hadoop-streaming/src/test/java/org/apache/hadoop/streaming/TestUnconsumedInput.java Tue Feb 28 17:43:08 2012 @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.streaming; + +import static org.junit.Assert.*; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.Test; + +public class TestUnconsumedInput { + protected final int EXPECTED_OUTPUT_SIZE = 10000; + protected File INPUT_FILE = new File("stream_uncinput_input.txt"); + protected File OUTPUT_DIR = new File("stream_uncinput_out"); + // map parses input lines and generates count entries for each word. + protected String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n"; + protected String map = UtilTest.makeJavaCommand(OutputOnlyApp.class, + new String[]{Integer.toString(EXPECTED_OUTPUT_SIZE)}); + + private StreamJob job; + + public TestUnconsumedInput() throws IOException + { + UtilTest utilTest = new UtilTest(getClass().getName()); + utilTest.checkUserDir(); + utilTest.redirectIfAntJunit(); + } + + protected void createInput() throws IOException + { + DataOutputStream out = new DataOutputStream( + new FileOutputStream(INPUT_FILE.getAbsoluteFile())); + for (int i=0; i<10000; ++i) { + out.write(input.getBytes("UTF-8")); + } + out.close(); + } + + protected String[] genArgs() { + return new String[] { + "-input", INPUT_FILE.getAbsolutePath(), + "-output", OUTPUT_DIR.getAbsolutePath(), + "-mapper", map, + "-reducer", "org.apache.hadoop.mapred.lib.IdentityReducer", + "-numReduceTasks", "0", + "-jobconf", "mapreduce.task.files.preserve.failedtasks=true", + "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp") + }; + } + + @Test + public void testUnconsumedInput() throws Exception + { + String outFileName = "part-00000"; + File outFile = null; + try { + try { + FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile()); + } catch (Exception e) { + } + + createInput(); + + // setup config to ignore unconsumed input + Configuration conf = new Configuration(); + conf.set("stream.minRecWrittenToEnableSkip_", "0"); + + job = new StreamJob(); + job.setConf(conf); + int exitCode = job.run(genArgs()); + assertEquals("Job failed", 0, exitCode); + outFile = new File(OUTPUT_DIR, outFileName).getAbsoluteFile(); + String output = StreamUtil.slurp(outFile); + assertEquals("Output was truncated", EXPECTED_OUTPUT_SIZE, + StringUtils.countMatches(output, "\t")); + } finally { + INPUT_FILE.delete(); + FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile()); + } + } +}