hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r483293 - in /lucene/hadoop/trunk: ./ src/contrib/ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/
Date Wed, 06 Dec 2006 23:44:36 GMT
Author: cutting
Date: Wed Dec  6 15:44:32 2006
New Revision: 483293

URL: http://svn.apache.org/viewvc?view=rev&rev=483293
Log:
HADOOP-779.  Fix contrib/streaming to work correctly with gzipped input.  Contributed by Hairong.

Added:
    lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/build-contrib.xml
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
    lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=483293&r1=483292&r2=483293
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Dec  6 15:44:32 2006
@@ -5,6 +5,9 @@
  1. HADOOP-780. Use ReflectionUtils to instantiate key and value
     objects. (ab)
 
+ 2. HADOOP-779. Fix contrib/streaming to work correctly with gzipped
+    input files.  (Hairong Kuang via cutting)
+
 
 Release 0.9.0 - 2006-12-01
 

Modified: lucene/hadoop/trunk/src/contrib/build-contrib.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/build-contrib.xml?view=diff&rev=483293&r1=483292&r2=483293
==============================================================================
--- lucene/hadoop/trunk/src/contrib/build-contrib.xml (original)
+++ lucene/hadoop/trunk/src/contrib/build-contrib.xml Wed Dec  6 15:44:32 2006
@@ -103,7 +103,7 @@
      srcdir="${src.test}"
      includes="**/*.java"
      destdir="${build.test}"
-     debug="${debug}">
+     debug="${javac.debug}">
       <classpath refid="test.classpath"/>
     </javac>
   </target>

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java?view=diff&rev=483293&r1=483292&r2=483293
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
(original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
Wed Dec  6 15:44:32 2006
@@ -45,7 +45,7 @@
     super(in, split, reporter, job, fs);
     gzipped_ = StreamInputFormat.isGzippedInput(job);
     if (gzipped_) {
-      din_ = new DataInputStream(new GZIPInputStream(in_));
+      din_ = new BufferedInputStream( (new GZIPInputStream(in_) ) );
     } else {
       din_ = in_;
     }
@@ -88,40 +88,24 @@
     Text tValue = (Text) value;
     byte[] line;
 
-    while (true) {
-      if (gzipped_) {
-        // figure EOS from readLine
-      } else {
-        long pos = in_.getPos();
-        if (pos >= end_) return false;
-      }
-
-      line = UTF8ByteArrayUtils.readLine((InputStream) in_);
-      if (line == null) return false;
-      try {
-        Text.validateUTF8(line);
-      } catch (MalformedInputException m) {
-        System.err.println("line=" + line + "|" + new Text(line));
-        System.out.flush();
-      }
-      try {
-        int tab = UTF8ByteArrayUtils.findTab(line);
-        if (tab == -1) {
-          tKey.set(line);
-          tValue.set("");
-        } else {
-          UTF8ByteArrayUtils.splitKeyVal(line, tKey, tValue, tab);
-        }
-        break;
-      } catch (MalformedInputException e) {
-        LOG.warn(StringUtils.stringifyException(e));
-      }
+    if ( !gzipped_  ) {
+      long pos = in_.getPos();
+      if (pos >= end_) return false;
+    }
+    
+    line = UTF8ByteArrayUtils.readLine((InputStream) din_);
+    if (line == null) return false;
+    int tab = UTF8ByteArrayUtils.findTab(line);
+    if (tab == -1) {
+      tKey.set(line);
+      tValue.set("");
+    } else {
+      UTF8ByteArrayUtils.splitKeyVal(line, tKey, tValue, tab);
     }
     numRecStats(line, 0, line.length);
     return true;
   }
 
   boolean gzipped_;
-  GZIPInputStream zin_;
-  DataInputStream din_; // GZIP or plain  
+  InputStream din_; // GZIP or plain  
 }

Added: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java?view=auto&rev=483293
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java
(added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java
Wed Dec  6 15:44:32 2006
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * This class tests gzip input streaming in MapReduce local mode.
+ */
+public class TestGzipInput extends TestStreaming
+{
+
+  public TestGzipInput() throws IOException {
+  }
+  
+  protected void createInput() throws IOException
+  {
+    GZIPOutputStream out = new GZIPOutputStream(
+        new FileOutputStream(INPUT_FILE.getAbsoluteFile()));
+    out.write(input.getBytes("UTF-8"));
+    out.close();
+  }
+
+  protected String[] genArgs() {
+    return new String[] {
+        "-input", INPUT_FILE.getAbsolutePath(),
+        "-output", OUTPUT_DIR.getAbsolutePath(),
+        "-mapper", map,
+        "-combiner", combine,
+        "-reducer", reduce,
+        "-jobconf", "stream.recordreader.compression=gzip"
+    };
+    
+  }
+
+  public static void main(String[]args) throws Exception
+  {
+    new TestGzipInput().testCommandLine();
+  }
+
+}

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java?view=diff&rev=483293&r1=483292&r2=483293
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
(original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
Wed Dec  6 15:44:32 2006
@@ -33,17 +33,17 @@
 
   // "map" command: grep -E (red|green|blue)
   // reduce command: uniq
-  String INPUT_FILE = "input.txt";
-  String OUTPUT_DIR = "out";
-  String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n";
+  protected File INPUT_FILE = new File("input.txt");
+  protected File OUTPUT_DIR = new File("out");
+  protected String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n";
   // map behaves like "/usr/bin/tr . \\n"; (split words into lines)
-  String map = StreamUtil.makeJavaCommand(TrApp.class, new String[]{".", "\\n"});
+  protected String map = StreamUtil.makeJavaCommand(TrApp.class, new String[]{".", "\\n"});
   // combine, reduce behave like /usr/bin/uniq. But also prepend lines with C, R.
-  String combine  = StreamUtil.makeJavaCommand(UniqApp.class, new String[]{"C"});
-  String reduce = StreamUtil.makeJavaCommand(UniqApp.class, new String[]{"R"});
-  String outputExpect = "RCare\t\nRCblue\t\nRCbunnies\t\nRCpink\t\nRCred\t\nRCroses\t\nRCviolets\t\n";
+  protected String combine  = StreamUtil.makeJavaCommand(UniqApp.class, new String[]{"C"});
+  protected String reduce = StreamUtil.makeJavaCommand(UniqApp.class, new String[]{"R"});
+  protected String outputExpect = "RCare\t\nRCblue\t\nRCbunnies\t\nRCpink\t\nRCred\t\nRCroses\t\nRCviolets\t\n";
 
-  StreamJob job;
+  private StreamJob job;
 
   public TestStreaming() throws IOException
   {
@@ -52,14 +52,27 @@
     utilTest.redirectIfAntJunit();
   }
 
-  void createInput() throws IOException
+  protected void createInput() throws IOException
   {
-    String path = new File(".", INPUT_FILE).getAbsolutePath();// needed from junit forked
vm
-    DataOutputStream out = new DataOutputStream(new FileOutputStream(path));
+    DataOutputStream out = new DataOutputStream(
+        new FileOutputStream(INPUT_FILE.getAbsoluteFile()));
     out.write(input.getBytes("UTF-8"));
     out.close();
   }
 
+  protected String[] genArgs() {
+    return new String[] {
+        "-input", INPUT_FILE.getAbsolutePath(),
+        "-output", OUTPUT_DIR.getAbsolutePath(),
+        "-mapper", map,
+        "-combiner", combine,
+        "-reducer", reduce,
+        //"-verbose",
+        //"-jobconf", "stream.debug=set"
+        "-jobconf", "keep.failed.task.files=true"
+        };
+  }
+  
   public void testCommandLine()
   {
     try {
@@ -68,30 +81,23 @@
 
       // During tests, the default Configuration will use a local mapred
       // So don't specify -config or -cluster
-      String argv[] = new String[] {
-          "-input", INPUT_FILE,
-          "-output", OUTPUT_DIR,
-          "-mapper", map,
-          "-combiner", combine,
-          "-reducer", reduce,
-          //"-verbose",
-          //"-jobconf", "stream.debug=set"
-          "-jobconf", "keep.failed.task.files=true",
-      };
-      job = new StreamJob(argv, mayExit);      
+      job = new StreamJob(genArgs(), mayExit);      
       job.go();
-      File outFile = new File(".", OUTPUT_DIR + "/part-00000").getAbsoluteFile();
+      File outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile();
       String output = StreamUtil.slurp(outFile);
+      outFile.delete();
       System.err.println("outEx1=" + outputExpect);
       System.err.println("  out1=" + output);
       assertEquals(outputExpect, output);
-
     } catch(Exception e) {
       failTrace(e);
+    } finally {
+      INPUT_FILE.delete();
+      OUTPUT_DIR.delete();
     }
   }
 
-  void failTrace(Exception e)
+  private void failTrace(Exception e)
   {
     StringWriter sw = new StringWriter();
     e.printStackTrace(new PrintWriter(sw));



Mime
View raw message