hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amareshw...@apache.org
Subject svn commit: r960446 - in /hadoop/mapreduce/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/test/mapred/org/apache/hadoop/mapred/
Date Mon, 05 Jul 2010 05:09:54 GMT
Author: amareshwari
Date: Mon Jul  5 05:09:53 2010
New Revision: 960446

URL: http://svn.apache.org/viewvc?rev=960446&view=rev
Log:
MAPREDUCE-1888. Fixes Streaming to override output key and value types, only if mapper/reducer
is a command. Contributed by Ravi Gummadi

Added:
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingOutputKeyValueTypes.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestFileArgs.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleArchiveFiles.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCombiner.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCounters.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrAppReduce.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputFormat.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=960446&r1=960445&r2=960446&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Jul  5 05:09:53 2010
@@ -137,6 +137,9 @@ Trunk (unreleased changes)
     MAPREDUCE-1864. Removes uninitialized/unused variables in
     org.apache.hadoop.streaming.PipeMapRed. (amareshwari)
 
+    MAPREDUCE-1888. Fixes Streaming to override output key and value types, 
+    only if mapper/reducer is a command. (Ravi Gummadi via amareshwari)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=960446&r1=960445&r2=960446&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
Mon Jul  5 05:09:53 2010
@@ -744,25 +744,15 @@ public class StreamJob implements Tool {
     jobConf_.setClass("stream.reduce.input.writer.class",
       idResolver.getInputWriterClass(), InputWriter.class);
     
-    idResolver.resolve(jobConf_.get("stream.map.output", IdentifierResolver.TEXT_ID));
-    jobConf_.setClass("stream.map.output.reader.class",
-      idResolver.getOutputReaderClass(), OutputReader.class);
-    jobConf_.setMapOutputKeyClass(idResolver.getOutputKeyClass());
-    jobConf_.setMapOutputValueClass(idResolver.getOutputValueClass());
-    
-    idResolver.resolve(jobConf_.get("stream.reduce.output", IdentifierResolver.TEXT_ID));
-    jobConf_.setClass("stream.reduce.output.reader.class",
-      idResolver.getOutputReaderClass(), OutputReader.class);
-    jobConf_.setOutputKeyClass(idResolver.getOutputKeyClass());
-    jobConf_.setOutputValueClass(idResolver.getOutputValueClass());
-    
     jobConf_.set("stream.addenvironment", addTaskEnvironment_);
 
+    boolean isMapperACommand = false;
     if (mapCmd_ != null) {
       c = StreamUtil.goodClassOrNull(jobConf_, mapCmd_, defaultPackage);
       if (c != null) {
         jobConf_.setMapperClass(c);
       } else {
+        isMapperACommand = true;
         jobConf_.setMapperClass(PipeMapper.class);
         jobConf_.setMapRunnerClass(PipeMapRunner.class);
         jobConf_.set("stream.map.streamprocessor", 
@@ -781,25 +771,62 @@ public class StreamJob implements Tool {
       }
     }
 
-    boolean reducerNone_ = false;
-    if (redCmd_ != null) {
-      reducerNone_ = redCmd_.equals(REDUCE_NONE);
-      if (redCmd_.compareToIgnoreCase("aggregate") == 0) {
-        jobConf_.setReducerClass(ValueAggregatorReducer.class);
-        jobConf_.setCombinerClass(ValueAggregatorCombiner.class);
-      } else {
+    if (numReduceTasksSpec_!= null) {
+      int numReduceTasks = Integer.parseInt(numReduceTasksSpec_);
+      jobConf_.setNumReduceTasks(numReduceTasks);
+    }
 
-        c = StreamUtil.goodClassOrNull(jobConf_, redCmd_, defaultPackage);
-        if (c != null) {
-          jobConf_.setReducerClass(c);
+    boolean isReducerACommand = false;
+    if (redCmd_ != null) {
+      if (redCmd_.equals(REDUCE_NONE)) {
+        jobConf_.setNumReduceTasks(0);
+      }
+      if (jobConf_.getNumReduceTasks() != 0) {
+        if (redCmd_.compareToIgnoreCase("aggregate") == 0) {
+          jobConf_.setReducerClass(ValueAggregatorReducer.class);
+          jobConf_.setCombinerClass(ValueAggregatorCombiner.class);
         } else {
-          jobConf_.setReducerClass(PipeReducer.class);
-          jobConf_.set("stream.reduce.streamprocessor", URLEncoder.encode(
-              redCmd_, "UTF-8"));
+
+          c = StreamUtil.goodClassOrNull(jobConf_, redCmd_, defaultPackage);
+          if (c != null) {
+            jobConf_.setReducerClass(c);
+          } else {
+            isReducerACommand = true;
+            jobConf_.setReducerClass(PipeReducer.class);
+            jobConf_.set("stream.reduce.streamprocessor", URLEncoder.encode(
+                redCmd_, "UTF-8"));
+          }
         }
       }
     }
 
+    idResolver.resolve(jobConf_.get("stream.map.output",
+        IdentifierResolver.TEXT_ID));
+    jobConf_.setClass("stream.map.output.reader.class",
+      idResolver.getOutputReaderClass(), OutputReader.class);
+    if (isMapperACommand) {
+      // if mapper is a command, then map output key/value classes come from the
+      // idResolver
+      jobConf_.setMapOutputKeyClass(idResolver.getOutputKeyClass());
+      jobConf_.setMapOutputValueClass(idResolver.getOutputValueClass());
+
+      if (jobConf_.getNumReduceTasks() == 0) {
+        jobConf_.setOutputKeyClass(idResolver.getOutputKeyClass());
+        jobConf_.setOutputValueClass(idResolver.getOutputValueClass());
+      }
+    }
+
+    idResolver.resolve(jobConf_.get("stream.reduce.output",
+        IdentifierResolver.TEXT_ID));
+    jobConf_.setClass("stream.reduce.output.reader.class",
+      idResolver.getOutputReaderClass(), OutputReader.class);
+    if (isReducerACommand) {
+      // if reducer is a command, then output key/value classes come from the
+      // idResolver
+      jobConf_.setOutputKeyClass(idResolver.getOutputKeyClass());
+      jobConf_.setOutputValueClass(idResolver.getOutputValueClass());
+    }
+
     if (inReaderSpec_ != null) {
       String[] args = inReaderSpec_.split(",");
       String readerClass = args[0];
@@ -846,14 +873,6 @@ public class StreamJob implements Tool {
       }
     }
     
-    if (numReduceTasksSpec_!= null) {
-      int numReduceTasks = Integer.parseInt(numReduceTasksSpec_);
-      jobConf_.setNumReduceTasks(numReduceTasks);
-    }
-    if (reducerNone_) {
-      jobConf_.setNumReduceTasks(0);
-    }
-    
     if(mapDebugSpec_ != null){
     	jobConf_.setMapDebugScript(mapDebugSpec_);
     }

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestFileArgs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestFileArgs.java?rev=960446&r1=960445&r2=960446&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestFileArgs.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestFileArgs.java
Mon Jul  5 05:09:53 2010
@@ -21,13 +21,14 @@ package org.apache.hadoop.streaming;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.URI;
-import java.util.zip.GZIPOutputStream;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.junit.After;
+import org.junit.Before;
 
 /**
  * This class tests that the '-file' argument to streaming results
@@ -59,8 +60,13 @@ public class TestFileArgs extends TestSt
     strJobTracker = JTConfig.JT_IPC_ADDRESS + "=localhost:" + mr.getJobTrackerPort();
     strNamenode = "fs.default.name=hdfs://" + namenode;
 
+    map = LS_PATH;
     FileSystem.setDefaultUri(conf, "hdfs://" + namenode);
+  }
 
+  @Before
+  @Override
+  public void setUp() throws IOException {
     // Set up side file
     FileSystem localFs = FileSystem.getLocal(conf);
     DataOutputStream dos = localFs.create(new Path("sidefile"));
@@ -68,6 +74,17 @@ public class TestFileArgs extends TestSt
     dos.close();
   }
 
+  @After
+  @Override
+  public void tearDown() {
+    if (mr != null) {
+      mr.shutdown();
+    }
+    if (dfs != null) {
+      dfs.shutdown();
+    }
+  }
+
   @Override
   protected String getExpectedOutput() {
     return EXPECTED_OUTPUT;
@@ -80,22 +97,14 @@ public class TestFileArgs extends TestSt
 
   @Override
   protected String[] genArgs() {
-    return new String[] {
-      "-input", INPUT_FILE.getAbsolutePath(),
-      "-output", OUTPUT_DIR.getAbsolutePath(),
-      "-file", new java.io.File("sidefile").getAbsolutePath(),
-      "-mapper", LS_PATH,
-      "-numReduceTasks", "0",
-      "-jobconf", strNamenode,
-      "-jobconf", strJobTracker,
-      "-jobconf", "stream.tmpdir=" + System.getProperty("test.build.data","/tmp")
-    };
+    args.add("-file");
+    args.add(new java.io.File("sidefile").getAbsolutePath());
+    args.add("-numReduceTasks");
+    args.add("0");
+    args.add("-jobconf");
+    args.add(strNamenode);
+    args.add("-jobconf");
+    args.add(strJobTracker);
+    return super.genArgs();
   }
-
-
-  public static void main(String[]args) throws Exception
-  {
-    new TestFileArgs().testCommandLine();
-  }
-
 }

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java?rev=960446&r1=960445&r2=960446&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java
Mon Jul  5 05:09:53 2010
@@ -40,21 +40,4 @@ public class TestGzipInput extends TestS
     out.write(input.getBytes("UTF-8"));
     out.close();
   }
-
-
-  protected String[] genArgs() {
-    return new String[] {
-      "-input", INPUT_FILE.getAbsolutePath(),
-      "-output", OUTPUT_DIR.getAbsolutePath(),
-      "-mapper", map,
-      "-reducer", reduce,
-    };
-    
-  }
-
-  public static void main(String[]args) throws Exception
-  {
-    new TestGzipInput().testCommandLine();
-  }
-
 }

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleArchiveFiles.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleArchiveFiles.java?rev=960446&r1=960445&r2=960446&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleArchiveFiles.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleArchiveFiles.java
Mon Jul  5 05:09:53 2010
@@ -82,6 +82,15 @@ public class TestMultipleArchiveFiles ex
     mr  = new MiniMRCluster(1, namenode, 3);
     strJobTracker = JTConfig.JT_IPC_ADDRESS + "=localhost:" + mr.getJobTrackerPort();
     strNamenode = "fs.default.name=" + namenode;
+
+    map = "xargs cat";
+    reduce = "cat";
+  }
+
+  @Override
+  protected void setInputOutput() {
+    inputFile = INPUT_FILE;
+    outDir = OUTPUT_DIR;
   }
   
   protected void createInput() throws IOException
@@ -114,30 +123,20 @@ public class TestMultipleArchiveFiles ex
     String cache1 = workDir + CACHE_ARCHIVE_1 + "#symlink1";
     String cache2 = workDir + CACHE_ARCHIVE_2 + "#symlink2";
 
-    return new String[] {
-      "-input", INPUT_FILE.toString(),
-      "-output", OUTPUT_DIR,
-      "-mapper", "xargs cat", 
-      "-reducer", "cat",
-      "-jobconf", "mapreduce.job.reduces=1",
-      "-cacheArchive", cache1,
-      "-cacheArchive", cache2,
-      "-jobconf", strNamenode,
-      "-jobconf", strJobTracker,
-      "-jobconf", "stream.tmpdir=" + System.getProperty("test.build.data","/tmp")
-    };
+    args.add("-jobconf");
+    args.add("mapreduce.job.reduces=1");
+    args.add("-cacheArchive");
+    args.add(cache1);
+    args.add("-cacheArchive");
+    args.add(cache2);
+    args.add("-jobconf");
+    args.add(strNamenode);
+    args.add("-jobconf");
+    args.add(strJobTracker);
+    return super.genArgs();
   }
 
-  //@Test
-  public void testCommandLine() throws Exception {
-    createInput();
-    String args[] = genArgs();
-    LOG.info("Testing streaming command line:\n" +
-             StringUtils.join(" ", Arrays.asList(args)));
-    job = new StreamJob(genArgs(), true);
-    if(job.go() != 0) {
-      throw new Exception("Job Failed");
-    }
+  protected void checkOutput() throws IOException {
     StringBuffer output = new StringBuffer(256);
     Path[] fileList = FileUtil.stat2Paths(fileSys.listStatus(
                                             new Path(OUTPUT_DIR)));
@@ -147,9 +146,4 @@ public class TestMultipleArchiveFiles ex
     }
     assertEquals(expectedOutput, output.toString());
   }
-
-  public static void main(String[]args) throws Exception
-  {
-    new TestMultipleArchiveFiles().testCommandLine();
-  }
 }

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java?rev=960446&r1=960445&r2=960446&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java
Mon Jul  5 05:09:53 2010
@@ -22,26 +22,23 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 
-import org.apache.hadoop.fs.FileUtil;
-
-import org.junit.Test;
-import static org.junit.Assert.*;
-
 /**
  * This class tests StreamXmlRecordReader
  * The test creates an XML file, uses StreamXmlRecordReader and compares
  * the expected output against the generated output
  */
-public class TestStreamXmlRecordReader extends TestStreaming
-{
-
-  private StreamJob job;
+public class TestStreamXmlRecordReader extends TestStreaming {
 
   public TestStreamXmlRecordReader() throws IOException {
     INPUT_FILE = new File("input.xml");
-    input = "<xmltag>\t\nroses.are.red\t\nviolets.are.blue\t\nbunnies.are.pink\t\n</xmltag>\t\n";
+    input = "<xmltag>\t\nroses.are.red\t\nviolets.are.blue\t\n" +
+        "bunnies.are.pink\t\n</xmltag>\t\n";
+    map = "cat";
+    reduce = "NONE";
+    outputExpect = input;
   }
-  
+
+  @Override
   protected void createInput() throws IOException
   {
     FileOutputStream out = new FileOutputStream(INPUT_FILE.getAbsoluteFile());
@@ -53,42 +50,10 @@ public class TestStreamXmlRecordReader e
     out.close();
   }
 
+  @Override
   protected String[] genArgs() {
-    return new String[] {
-      "-input", INPUT_FILE.getAbsolutePath(),
-      "-output", OUTPUT_DIR.getAbsolutePath(),
-      "-mapper","cat", 
-      "-reducer", "NONE", 
-      "-inputreader", "StreamXmlRecordReader,begin=<xmltag>,end=</xmltag>"
-    };
-  }
-
-  @Test
-  public void testCommandLine() throws Exception {
-    try {
-      try {
-        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
-      } catch (Exception e) {
-      }
-      createInput();
-      job = new StreamJob(genArgs(), false);
-      job.go();
-      File outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile();
-      String output = StreamUtil.slurp(outFile);
-      outFile.delete();
-      assertEquals(input, output);
-    } finally {
-      try {
-        INPUT_FILE.delete();
-        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-    }
-  }
-
-  public static void main(String[]args) throws Exception
-  {
-    new TestStreamXmlRecordReader().testCommandLine();
+    args.add("-inputreader");
+    args.add("StreamXmlRecordReader,begin=<xmltag>,end=</xmltag>");
+    return super.genArgs();
   }
 }

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java?rev=960446&r1=960445&r2=960446&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
Mon Jul  5 05:09:53 2010
@@ -19,11 +19,13 @@
 package org.apache.hadoop.streaming;
 
 import java.io.*;
+import java.util.ArrayList;
 
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import static org.junit.Assert.*;
 
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.conf.Configuration;
@@ -40,6 +42,8 @@ public class TestStreaming
   protected File TEST_DIR;
   protected File INPUT_FILE;
   protected File OUTPUT_DIR;
+  protected String inputFile;
+  protected String outDir;
   protected String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n";
   // map behaves like "/usr/bin/tr . \\n"; (split words into lines)
   protected String map = StreamUtil.makeJavaCommand(TrApp.class, new String[]{".", "\\n"});
@@ -48,6 +52,7 @@ public class TestStreaming
   protected String reduce = StreamUtil.makeJavaCommand(UniqApp.class, new String[]{"R"});
   protected String outputExpect = "Rare\t\nRblue\t\nRbunnies\t\nRpink\t\nRred\t\nRroses\t\nRviolets\t\n";
 
+  protected ArrayList<String> args = new ArrayList<String>();
   protected StreamJob job;
 
   public TestStreaming() throws IOException
@@ -60,6 +65,17 @@ public class TestStreaming
     INPUT_FILE = new File(TEST_DIR, "input.txt");
   }
 
+  @Before
+  public void setUp() throws IOException {
+    UtilTest.recursiveDelete(TEST_DIR);
+    assertTrue("Creating " + TEST_DIR, TEST_DIR.mkdirs());
+  }
+
+  @After
+  public void tearDown() {
+    UtilTest.recursiveDelete(TEST_DIR);
+  }
+
   protected String getInputData() {
     return input;
   }
@@ -72,15 +88,25 @@ public class TestStreaming
     out.close();
   }
 
+  protected void setInputOutput() {
+    inputFile = INPUT_FILE.getAbsolutePath();
+    outDir = OUTPUT_DIR.getAbsolutePath();
+  }
+
   protected String[] genArgs() {
-    return new String[] {
-      "-input", INPUT_FILE.getAbsolutePath(),
-      "-output", OUTPUT_DIR.getAbsolutePath(),
-      "-mapper", map,
-      "-reducer", reduce,
-      "-jobconf", "mapreduce.task.files.preserve.failedtasks=true",
-      "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
-    };
+    setInputOutput();
+    args.add("-input");args.add(inputFile);
+    args.add("-output");args.add(outDir);
+    args.add("-mapper");args.add(map);
+    args.add("-reducer");args.add(reduce);
+    args.add("-jobconf");
+    args.add("mapreduce.task.files.preserve.failedtasks=true");
+    args.add("-jobconf");
+    args.add("stream.tmpdir="+System.getProperty("test.build.data","/tmp"));
+
+    String str[] = new String [args.size()];
+    args.toArray(str);
+    return str;
   }
 
   protected Configuration getConf() {
@@ -105,25 +131,26 @@ public class TestStreaming
     assertEquals(getExpectedOutput(), output);
   }
 
-  @Test
-  public void testCommandLine() throws Exception
-  {
-    UtilTest.recursiveDelete(TEST_DIR);
-    assertTrue("Creating " + TEST_DIR, TEST_DIR.mkdirs());
+  /**
+   * Runs a streaming job with the given arguments
+   * @return the streaming job return status
+   * @throws IOException
+   */
+  protected int runStreamJob() throws IOException {
     createInput();
     boolean mayExit = false;
 
     // During tests, the default Configuration will use a local mapred
     // So don't specify -config or -cluster
     job = new StreamJob(genArgs(), mayExit);
-    int ret = job.go();
-    assertEquals(0, ret);
-    checkOutput();
+    return job.go();
   }
 
-  public static void main(String[]args) throws Exception
+  @Test
+  public void testCommandLine() throws Exception
   {
-    new TestStreaming().testCommandLine();
+    int ret = runStreamJob();
+    assertEquals(0, ret);
+    checkOutput();
   }
-
 }

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCombiner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCombiner.java?rev=960446&r1=960445&r2=960446&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCombiner.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCombiner.java
Mon Jul  5 05:09:53 2010
@@ -27,25 +27,21 @@ import static org.junit.Assert.*;
 
 public class TestStreamingCombiner extends TestStreaming {
 
-  protected String combine = StreamUtil.makeJavaCommand(UniqApp.class, new String[]{""});
+  protected String combine = StreamUtil.makeJavaCommand(
+      UniqApp.class, new String[]{""});
   
   public TestStreamingCombiner() throws IOException {
     super();
   }
   
   protected String[] genArgs() {
-    return new String[] {
-      "-input", INPUT_FILE.getAbsolutePath(),
-      "-output", OUTPUT_DIR.getAbsolutePath(),
-      "-mapper", map,
-      "-reducer", reduce,
-      "-combiner", combine,
-      "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
-    };
+    args.add("-combiner");
+    args.add(combine);
+    return super.genArgs();
   }
 
   @Test
-  public void testCommandLine() throws Exception  {
+  public void testCommandLine() throws Exception {
     super.testCommandLine();
     // validate combiner counters
     String counterGrp = "org.apache.hadoop.mapred.Task$Counter";
@@ -55,10 +51,4 @@ public class TestStreamingCombiner exten
     assertTrue(counters.findCounter(
                counterGrp, "COMBINE_OUTPUT_RECORDS").getValue() != 0);
   }
-
-  public static void main(String[]args) throws Exception
-  {
-    new TestStreamingCombiner().testCommandLine();
-  }
-
 }

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCounters.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCounters.java?rev=960446&r1=960445&r2=960446&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCounters.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCounters.java
Mon Jul  5 05:09:53 2010
@@ -21,10 +21,8 @@ package org.apache.hadoop.streaming;
 import org.junit.Test;
 import static org.junit.Assert.*;
 
-import java.io.File;
 import java.io.IOException;
 
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.Counters.Group;
@@ -38,41 +36,18 @@ public class TestStreamingCounters exten
   }
 
   @Test
-  public void testCommandLine() throws IOException
-  {
-    try {
-      try {
-        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
-      } catch (Exception e) {
-      }
-
-      createInput();
-      boolean mayExit = false;
-
-      // During tests, the default Configuration will use a local mapred
-      // So don't specify -config or -cluster
-      StreamJob job = new StreamJob(genArgs(), mayExit);      
-      job.go();
-      File outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile();
-      String output = StreamUtil.slurp(outFile);
-      outFile.delete();
-      assertEquals(outputExpect, output);
-      
-      Counters counters = job.running_.getCounters();
-      assertNotNull("Counters", counters);
-      Group group = counters.getGroup("UserCounters");
-      assertNotNull("Group", group);
-      Counter counter = group.getCounterForName("InputLines");
-      assertNotNull("Counter", counter);
-      assertEquals(3, counter.getCounter());
-    } finally {
-      try {
-        INPUT_FILE.delete();
-        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-    }
+  public void testCommandLine() throws Exception {
+    super.testCommandLine();
+    validateCounters();
   }
   
+  private void validateCounters() throws IOException {
+    Counters counters = job.running_.getCounters();
+    assertNotNull("Counters", counters);
+    Group group = counters.getGroup("UserCounters");
+    assertNotNull("Group", group);
+    Counter counter = group.getCounterForName("InputLines");
+    assertNotNull("Counter", counter);
+    assertEquals(3, counter.getCounter());
+  }
 }

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java?rev=960446&r1=960445&r2=960446&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java
Mon Jul  5 05:09:53 2010
@@ -19,14 +19,10 @@
 package org.apache.hadoop.streaming;
 
 import org.junit.Test;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
-import java.io.*;
-import java.util.*;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
+import java.io.File;
+import java.io.IOException;
 
 /**
  * This class tests if hadoopStreaming returns Exception 
@@ -37,55 +33,23 @@ import org.apache.hadoop.fs.Path;
 public class TestStreamingFailure extends TestStreaming
 {
 
-  protected File INVALID_INPUT_FILE;// = new File("invalid_input.txt");
-  private StreamJob job;
+  protected File INVALID_INPUT_FILE;
 
   public TestStreamingFailure() throws IOException
   {
     INVALID_INPUT_FILE = new File("invalid_input.txt");
   }
 
-  protected String[] genArgs() {
-    return new String[] {
-      "-input", INVALID_INPUT_FILE.getAbsolutePath(),
-      "-output", OUTPUT_DIR.getAbsolutePath(),
-      "-mapper", map,
-      "-reducer", reduce,
-      "-jobconf", "mapreduce.task.files.preserve.failedtasks=true",
-      "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
-    };
+  @Override
+  protected void setInputOutput() {
+    inputFile = INVALID_INPUT_FILE.getAbsolutePath();
+    outDir = OUTPUT_DIR.getAbsolutePath();
   }
 
+  @Override
   @Test
-  public void testCommandLine()
-  {
-    try {
-      try {
-        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
-      } catch (Exception e) {
-      }
-
-      boolean mayExit = false;
-      int returnStatus = 0;
-
-      // During tests, the default Configuration will use a local mapred
-      // So don't specify -config or -cluster
-      job = new StreamJob(genArgs(), mayExit);      
-      returnStatus = job.go();
-      assertEquals("Streaming Job Failure code expected", 5, returnStatus);
-    } catch(Exception e) {
-      // Expecting an exception
-    } finally {
-      try {
-      FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-    }
-  }
-
-  public static void main(String[]args) throws Exception
-  {
-      new TestStreamingFailure().testCommandLine();
+  public void testCommandLine() throws IOException {
+    int returnStatus = runStreamJob();
+    assertEquals("Streaming Job Failure code expected", 5, returnStatus);
   }
 }

Added: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingOutputKeyValueTypes.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingOutputKeyValueTypes.java?rev=960446&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingOutputKeyValueTypes.java
(added)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingOutputKeyValueTypes.java
Mon Jul  5 05:09:53 2010
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Tests stream job with java tasks, commands in MapReduce local mode.
+ * Validates if user-set config properties
+ * {@link MRJobConfig#MAP_OUTPUT_KEY_CLASS} and
+ * {@link MRJobConfig#OUTPUT_KEY_CLASS} are honored by streaming jobs.
+ */
+public class TestStreamingOutputKeyValueTypes extends TestStreaming {
+
+  public TestStreamingOutputKeyValueTypes() throws IOException {
+    super();
+    input = "one line dummy input\n";
+  }
+
+  @Before
+  @Override
+  public void setUp() throws IOException {
+    args.clear();
+    super.setUp();
+  }
+
+  @Override
+  protected String[] genArgs() {
+    // set the testcase-specific config properties first and the remaining
+    // arguments are set in TestStreaming.genArgs().
+    args.add("-jobconf");
+    args.add(MRJobConfig.MAP_OUTPUT_KEY_CLASS +
+        "=org.apache.hadoop.io.LongWritable");
+    args.add("-jobconf");
+    args.add(MRJobConfig.OUTPUT_KEY_CLASS +
+        "=org.apache.hadoop.io.LongWritable");
+
+    // Using SequenceFileOutputFormat here because with TextOutputFormat, the
+    // mapred.output.key.class set in JobConf (which we want to test here) is
+    // not read/used at all.
+    args.add("-outputformat");
+    args.add("org.apache.hadoop.mapred.SequenceFileOutputFormat");
+
+    return super.genArgs();
+  }
+
+  @Override
+  protected void checkOutput() throws IOException {
+    // No need to validate output for the test cases in this class
+  }
+
+  public static class MyReducer<K, V>
+  extends MapReduceBase implements Reducer<K, V, LongWritable, Text> {
+
+    public void reduce(K key, Iterator<V> values,
+        OutputCollector<LongWritable, Text> output, Reporter reporter)
+        throws IOException {
+      LongWritable l = new LongWritable();
+      while (values.hasNext()) {
+        output.collect(l, new Text(values.next().toString()));
+      }
+    }
+  }
+
+  // Check with Java Mapper, Java Reducer
+  @Test
+  public void testJavaMapperAndJavaReducer() throws Exception {
+    map = "org.apache.hadoop.mapred.lib.IdentityMapper";
+    reduce = "org.apache.hadoop.mapred.lib.IdentityReducer";
+    super.testCommandLine();
+  }
+
+  // Check with Java Mapper, Java Reducer and -numReduceTasks 0
+  @Test
+  public void testJavaMapperAndJavaReducerAndZeroReduces() throws Exception {
+    map = "org.apache.hadoop.mapred.lib.IdentityMapper";
+    reduce = "org.apache.hadoop.mapred.lib.IdentityReducer";
+    args.add("-numReduceTasks");
+    args.add("0");
+    super.testCommandLine();
+  }
+
+  // Check with Java Mapper, Reducer = "NONE"
+  @Test
+  public void testJavaMapperWithReduceNone() throws Exception {
+    map = "org.apache.hadoop.mapred.lib.IdentityMapper";
+    reduce = "NONE";
+    super.testCommandLine();
+  }
+
+  // Check with Java Mapper, command Reducer
+  @Test
+  public void testJavaMapperAndCommandReducer() throws Exception {
+    map = "org.apache.hadoop.mapred.lib.IdentityMapper";
+    reduce = "cat";
+    super.testCommandLine();
+  }
+
+  // Check with Java Mapper, command Reducer and -numReduceTasks 0
+  @Test
+  public void testJavaMapperAndCommandReducerAndZeroReduces() throws Exception {
+    map = "org.apache.hadoop.mapred.lib.IdentityMapper";
+    reduce = "cat";
+    args.add("-numReduceTasks");
+    args.add("0");
+    super.testCommandLine();
+  }
+
+  // Check with Command Mapper, Java Reducer
+  @Test
+  public void testCommandMapperAndJavaReducer() throws Exception {
+    map = "cat";
+    reduce = MyReducer.class.getName();
+    super.testCommandLine();
+  }
+
+  // Check with Command Mapper, Java Reducer and -numReduceTasks 0
+  @Test
+  public void testCommandMapperAndJavaReducerAndZeroReduces() throws Exception {
+    map = "cat";
+    reduce = MyReducer.class.getName();
+    args.add("-numReduceTasks");
+    args.add("0");
+    super.testCommandLine();
+  }
+
+  // Check with Command Mapper, Reducer = "NONE"
+  @Test
+  public void testCommandMapperWithReduceNone() throws Exception {
+    map = "cat";
+    reduce = "NONE";
+    super.testCommandLine();
+  }
+
+  // Check with Command Mapper, Command Reducer
+  @Test
+  public void testCommandMapperAndCommandReducer() throws Exception {
+    map = "cat";
+    reduce = "cat";
+    super.testCommandLine();
+  }
+
+  // Check with Command Mapper, Command Reducer and -numReduceTasks 0
+  @Test
+  public void testCommandMapperAndCommandReducerAndZeroReduces()
+      throws Exception {
+    map = "cat";
+    reduce = "cat";
+    args.add("-numReduceTasks");
+    args.add("0");
+    super.testCommandLine();
+  }
+
+  @Override
+  @Test
+  public void testCommandLine() {
+    // Do nothing
+  }
+}

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java?rev=960446&r1=960445&r2=960446&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
Mon Jul  5 05:09:53 2010
@@ -23,8 +23,9 @@ import java.io.*;
 import org.apache.hadoop.streaming.Environment;
 
 /** A minimal Java implementation of /usr/bin/tr.
-    Used to test the usage of external applications without adding
-    platform-specific dependencies.
+ *  Used to test the usage of external applications without adding
+ *  platform-specific dependencies.
+ *  Use TrApp as mapper only. For reducer, use TrAppReduce.
  */
 public class TrApp
 {
@@ -43,8 +44,8 @@ public class TrApp
     // property names have been escaped in PipeMapRed.safeEnvVarName()
     expectDefined("mapreduce_cluster_local_dir");
     expect("mapred_output_format_class", "org.apache.hadoop.mapred.TextOutputFormat");
-    expect("mapreduce_job_output_key_class", "org.apache.hadoop.io.Text");
-    expect("mapreduce_job_output_value_class", "org.apache.hadoop.io.Text");
+    expect("mapreduce_map_output_key_class", "org.apache.hadoop.io.Text");
+    expect("mapreduce_map_output_value_class", "org.apache.hadoop.io.Text");
 
     expect("mapreduce_task_ismap", "true");
     expectDefined("mapreduce_task_attempt_id");

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrAppReduce.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrAppReduce.java?rev=960446&r1=960445&r2=960446&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrAppReduce.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrAppReduce.java
Mon Jul  5 05:09:53 2010
@@ -23,8 +23,9 @@ import java.io.*;
 import org.apache.hadoop.streaming.Environment;
 
 /** A minimal Java implementation of /usr/bin/tr.
-    Used to test the usage of external applications without adding
-    platform-specific dependencies.
+ *  Used to test the usage of external applications without adding
+ *  platform-specific dependencies.
+ *  Use TrAppReduce as reducer only. For mapper, use TrApp.
  */
 public class TrAppReduce
 {

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputFormat.java?rev=960446&r1=960445&r2=960446&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputFormat.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputFormat.java
Mon Jul  5 05:09:53 2010
@@ -67,9 +67,6 @@ public class TestFileOutputFormat extend
     conf.setJobName("fof");
     conf.setInputFormat(TextInputFormat.class);
 
-    conf.setOutputKeyClass(LongWritable.class);
-    conf.setOutputValueClass(Text.class);
-
     conf.setMapOutputKeyClass(LongWritable.class);
     conf.setMapOutputValueClass(Text.class);
 



Mime
View raw message