hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r644678 - in /hadoop/core/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/
Date Fri, 04 Apr 2008 10:50:28 GMT
Author: tomwhite
Date: Fri Apr  4 03:50:26 2008
New Revision: 644678

URL: http://svn.apache.org/viewvc?rev=644678&view=rev
Log:
HADOOP-3089.  Streaming should accept stderr from task before first key arrives.  Contributed
by Rick Cox.

Added:
    hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/StderrApp.java
    hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingStderr.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=644678&r1=644677&r2=644678&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Apr  4 03:50:26 2008
@@ -464,6 +464,9 @@
     HADOOP-3123. Fix the native library build scripts to work on Solaris.
     (tomwhite via omalley)
 
+    HADOOP-3089.  Streaming should accept stderr from task before
+    first key arrives.  (Rick Cox via tomwhite)
+
 Release 0.16.2 - 2008-04-02
 
   BUG FIXES

Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=644678&r1=644677&r2=644678&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
(original)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
Fri Apr  4 03:50:26 2008
@@ -188,6 +188,8 @@
       clientErr_ = new DataInputStream(new BufferedInputStream(sim.getErrorStream()));
       startTime_ = System.currentTimeMillis();
 
+      errThread_ = new MRErrorThread();
+      errThread_.start();
     } catch (Exception e) {
       logStackTrace(e);
       LOG.error("configuration exception", e);
@@ -318,8 +320,7 @@
   void startOutputThreads(OutputCollector output, Reporter reporter) {
     outThread_ = new MROutputThread(output, reporter);
     outThread_.start();
-    errThread_ = new MRErrorThread(reporter);
-    errThread_.start();
+    errThread_.setReporter(reporter);
   }
 
   void waitOutputThreads() {
@@ -423,11 +424,14 @@
 
   class MRErrorThread extends Thread {
 
-    public MRErrorThread(Reporter reporter) {
-      this.reporter = reporter;
+    public MRErrorThread() {
       setDaemon(true);
     }
-
+    
+    public void setReporter(Reporter reporter) {
+      this.reporter = reporter;
+    }
+      
     public void run() {
       byte[] line;
       try {
@@ -435,7 +439,7 @@
           String lineStr = new String(line, "UTF-8");
           System.err.println(lineStr);
           long now = System.currentTimeMillis(); 
-          if (now-lastStderrReport > reporterErrDelay_) {
+          if (reporter != null && now-lastStderrReport > reporterErrDelay_) {
             lastStderrReport = now;
             reporter.progress();
           }
@@ -459,7 +463,7 @@
       }
     }
     long lastStderrReport = 0;
-    Reporter reporter;
+    volatile Reporter reporter;
   }
 
   public void mapRedFinished() {

Added: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/StderrApp.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/StderrApp.java?rev=644678&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/StderrApp.java
(added)
+++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/StderrApp.java
Fri Apr  4 03:50:26 2008
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+
+/**
+ * Output an arbitrary number of stderr lines before or after
+ * consuming the keys/values from stdin.
+ */
+public class StderrApp
+{
+  /**
+   * Print preWriteLines to stderr, pausing sleep ms between each
+   * output, then consume stdin and echo it to stdout, then write
+   * postWriteLines to stderr.
+   */
+  public static void go(int preWriteLines, int sleep, int postWriteLines) throws IOException
{
+    BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
+    String line;
+       
+    while (preWriteLines > 0) {
+      --preWriteLines;
+      System.err.println("some stderr output before reading input, "
+                         + preWriteLines + " lines remaining, sleeping " + sleep);
+      try {
+        Thread.sleep(sleep);
+      } catch (InterruptedException e) {}
+    }
+    
+    while ((line = in.readLine()) != null) {
+      System.out.println(line);
+    }
+    
+    while (postWriteLines > 0) {
+      --postWriteLines;
+      System.err.println("some stderr output after reading input, lines remaining "
+                         + postWriteLines);
+    }
+  }
+
+  public static void main(String[] args) throws IOException {
+    if (args.length < 3) {
+      System.err.println("Usage: StderrApp PREWRITE SLEEP POSTWRITE");
+      return;
+    }
+    int preWriteLines = Integer.parseInt(args[0]);
+    int sleep = Integer.parseInt(args[1]);
+    int postWriteLines = Integer.parseInt(args[2]);
+    
+    go(preWriteLines, sleep, postWriteLines);
+  }
+}

Added: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingStderr.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingStderr.java?rev=644678&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingStderr.java
(added)
+++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingStderr.java
Fri Apr  4 03:50:26 2008
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Test that streaming consumes stderr from the streaming process
+ * (before, during, and after the main processing of mapred input),
+ * and that stderr messages count as task progress.
+ */
+public class TestStreamingStderr extends TestCase
+{
+  public TestStreamingStderr() throws IOException {
+    UtilTest utilTest = new UtilTest(getClass().getName());
+    utilTest.checkUserDir();
+    utilTest.redirectIfAntJunit();
+  }
+
+  protected String[] genArgs(File input, File output, int preLines, int duringLines, int
postLines) {
+    return new String[] {
+      "-input", input.getAbsolutePath(),
+      "-output", output.getAbsolutePath(),
+      "-mapper", StreamUtil.makeJavaCommand(StderrApp.class,
+                                            new String[]{Integer.toString(preLines),
+                                                         Integer.toString(duringLines),
+                                                         Integer.toString(postLines)}),
+      "-reducer", StreamJob.REDUCE_NONE,
+      "-jobconf", "keep.failed.task.files=true",
+      "-jobconf", "mapred.task.timeout=5000",
+      "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
+    };
+  }
+
+  protected File setupInput(String base, boolean hasInput) throws IOException {
+    File input = new File(base + "-input.txt");
+    UtilTest.recursiveDelete(input);
+    FileOutputStream in = new FileOutputStream(input.getAbsoluteFile());
+    if (hasInput) {
+      in.write("hello\n".getBytes());      
+    }
+    in.close();
+    return input;
+  }
+  
+  protected File setupOutput(String base) throws IOException {
+    File output = new File(base + "-out");
+    UtilTest.recursiveDelete(output);
+    return output;
+  }
+
+  public void runStreamJob(String baseName, boolean hasInput,
+                           int preLines, int duringLines, int postLines) {
+    try {
+      File input = setupInput(baseName, hasInput);
+      File output = setupOutput(baseName);
+      boolean mayExit = false;
+      int returnStatus = 0;
+
+      StreamJob job = new StreamJob(genArgs(input, output, preLines, duringLines, postLines),
mayExit);
+      returnStatus = job.go();
+      assertEquals("StreamJob success", 0, returnStatus);
+    } catch (Exception e) {
+      failTrace(e);
+    }
+  }
+
+  // This test will fail by blocking forever if the stderr isn't
+  // consumed by Hadoop for tasks that don't have any input.
+  public void testStderrNoInput() throws IOException {
+    runStreamJob("stderr-pre", false, 10000, 0, 0);
+  }
+
+  // Streaming should continue to read stderr even after all input has
+  // been consumed.
+  public void testStderrAfterOutput() throws IOException {
+    runStreamJob("stderr-post", false, 0, 0, 10000);
+  }
+
+  // This test should produce a task timeout if stderr lines aren't
+  // counted as progress. This won't actually work until
+  // LocalJobRunner supports timeouts.
+  public void testStderrCountsAsProgress() throws IOException {
+    runStreamJob("stderr-progress", true, 10, 1000, 0);
+  }
+  
+  protected void failTrace(Exception e) {
+    StringWriter sw = new StringWriter();
+    e.printStackTrace(new PrintWriter(sw));
+    fail(sw.toString());
+  }
+}



Mime
View raw message