hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r437848 - in /lucene/hadoop/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/
Date Mon, 28 Aug 2006 21:36:33 GMT
Author: cutting
Date: Mon Aug 28 14:36:32 2006
New Revision: 437848

URL: http://svn.apache.org/viewvc?rev=437848&view=rev
Log:
HADOOP-437.  contrib/streaming: Add support for gzipped inputs.  Contributed by Michel.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=437848&r1=437847&r2=437848&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Aug 28 14:36:32 2006
@@ -86,6 +86,9 @@
 21. HADOOP-486.  Add the job username to JobStatus instances returned
     by JobClient.  (Mahadev Konar via cutting)
 
+22. HADOOP-437.  contrib/streaming: Add support for gzipped inputs.
+    (Michel Tourn via cutting)
+
 
 Release 0.5.0 - 2006-08-04
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=437848&r1=437847&r2=437848&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
(original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
Mon Aug 28 14:36:32 2006
@@ -166,6 +166,10 @@
       String argv = getPipeCommand(job);
       keyCols_ = getKeyColsFromPipeCommand(argv);
 
+      debug_ = (job.get("stream.debug") != null);
+      if(debug_) {
+        System.out.println("PipeMapRed: stream.debug=true");
+      }
       job_ = job;
 
       // Currently: null is identity reduce. REDUCE_NONE is no-map-outputs.
@@ -194,6 +198,7 @@
       optSideEffect_ = getUseSideEffect();
 
       if(optSideEffect_) {
+        // in cluster local named: outnone/map_bw5nzv
         String fileName = job_.get("mapred.task.id");
         sideEffectPath_ = new Path(job_.getOutputPath(), fileName);
         FileSystem fs = FileSystem.get(job_);
@@ -360,7 +365,7 @@
       val.set(line.substring(pos+1));
     }
   }
-
+  
   class MROutputThread extends Thread
   {
     MROutputThread(OutputCollector output, Reporter reporter)
@@ -557,6 +562,7 @@
   int reportPortPlusOne_;
 
   boolean doPipe_;
+  boolean debug_;
 
   Process sim;
   Object doneLock_;

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?rev=437848&r1=437847&r2=437848&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
(original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
Mon Aug 28 14:36:32 2006
@@ -72,7 +72,8 @@
     try {
       // 1/4 Hadoop in
       if(key instanceof BytesWritable) {
-        mapredKey_ = new String(((BytesWritable)key).get(), "UTF-8");
+        BytesWritable bKey = (BytesWritable)key;
+        mapredKey_ = new String(bKey.get(), 0, bKey.getSize(), "UTF-8");
       } else {
         mapredKey_ = key.toString();        
       }
@@ -84,7 +85,8 @@
       if(numExceptions_==0) {
         String sval;
         if(value instanceof BytesWritable) {
-          sval = new String(((BytesWritable)value).get(), "UTF-8");
+          BytesWritable bVal = (BytesWritable)value;
+          sval = new String(bVal.get(), 0, bVal.getSize(), "UTF-8");
         } else {
           sval = value.toString();
         }

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java?rev=437848&r1=437847&r2=437848&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
(original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
Mon Aug 28 14:36:32 2006
@@ -45,7 +45,7 @@
   protected static final Log LOG = LogFactory.getLog(StreamBaseRecordReader.class.getName());
   
   // custom JobConf properties for this class are prefixed with this namespace
-  final String CONF_NS = "stream.recordreader.";
+  final static String CONF_NS = "stream.recordreader.";
 
   public StreamBaseRecordReader(
     FSDataInputStream in, FileSplit split, Reporter reporter, JobConf job, FileSystem fs)

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java?rev=437848&r1=437847&r2=437848&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
(original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
Mon Aug 28 14:36:32 2006
@@ -62,6 +62,36 @@
     return b;
   }
 
+  static boolean isGzippedInput(JobConf job)
+  {
+    String val = job.get(StreamBaseRecordReader.CONF_NS + "compression");
+    return "gzip".equals(val);
+  }
+
+  public FileSplit[] getSplits(FileSystem fs, JobConf job, int numSplits)
+    throws IOException {
+      
+    if(isGzippedInput(job)) {
+      return getFullFileSplits(fs, job);
+    } else {
+      return super.getSplits(fs, job, numSplits);
+    }   
+  }
+  
+  /** For the compressed-files case: override InputFormatBase to produce one split. */
+  FileSplit[] getFullFileSplits(FileSystem fs, JobConf job)
+    throws IOException
+  {
+    Path[] files = listPaths(fs, job);
+    int numSplits = files.length;
+    ArrayList splits = new ArrayList(numSplits);
+    for (int i = 0; i < files.length; i++) {
+      Path file = files[i];
+      long splitSize = fs.getLength(file);
+      splits.add(new FileSplit(file, 0, splitSize));
+    }
+    return (FileSplit[])splits.toArray(new FileSplit[splits.size()]);
+  }
 
   protected Path[] listPaths(FileSystem fs, JobConf job)
     throws IOException
@@ -170,4 +200,5 @@
     return reader;
   }
 
+  
 }

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=437848&r1=437847&r2=437848&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
(original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
Mon Aug 28 14:36:32 2006
@@ -22,6 +22,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
+import java.util.Map;
 
 import org.apache.commons.logging.*;
 
@@ -290,6 +291,11 @@
     System.out.println();
     System.out.println("To set the number of reduce tasks (num. of output files):");
     System.out.println("  -jobconf mapred.reduce.tasks=10");
+    System.out.println("To name the job (appears in the JobTrack Web UI):");
+    System.out.println("  -jobconf mapred.job.name='My Job' ");
+    System.out.println("To specify that line-oriented input is in gzip format:");
+    System.out.println("(at this time ALL input files must be gzipped and this is not recognized
based on file extension)");
+    System.out.println("   -jobconf stream.recordreader.compression=gzip ");
     System.out.println("To change the local temp directory:");
     System.out.println("  -jobconf dfs.data.dir=/tmp");
     System.out.println("Additional local temp directories with -cluster local:");

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java?rev=437848&r1=437847&r2=437848&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
(original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
Mon Aug 28 14:36:32 2006
@@ -17,6 +17,7 @@
 package org.apache.hadoop.streaming;
 
 import java.io.*;
+import java.util.zip.GZIPInputStream; 
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -41,35 +42,54 @@
     throws IOException
   {
     super(in, split, reporter, job, fs);
+    gzipped_ = StreamInputFormat.isGzippedInput(job);
+    if(gzipped_) {
+      din_ = new DataInputStream(new GZIPInputStream(in_));
+    } else {
+      din_ = in_;
+    }
   }
 
   public void seekNextRecordBoundary() throws IOException
   {
-    int bytesSkipped = 0;
-    if (start_ != 0) {
-      in_.seek(start_ - 1);
-      // scan to the next newline in the file
-      while (in_.getPos() < end_) {
-        char c = (char)in_.read();
-        bytesSkipped++;
-        if (c == '\r' || c == '\n') {
-          break;
+    if(gzipped_) {
+      // no skipping: use din_ as-is 
+      // assumes splitter created only one split per file
+      return;
+    } else {
+      int bytesSkipped = 0;
+      if (start_ != 0) {
+        in_.seek(start_ - 1);
+        // scan to the next newline in the file
+        while (in_.getPos() < end_) {
+          char c = (char)in_.read();
+          bytesSkipped++;
+          if (c == '\r' || c == '\n') {
+            break;
+          }
         }
       }
-    }
 
-    //System.out.println("getRecordReader start="+start_ + " end=" + end_ + " bytesSkipped"+bytesSkipped);
+      //System.out.println("getRecordReader start="+start_ + " end=" + end_ + " bytesSkipped"+bytesSkipped);
+    }
   }
 
   public synchronized boolean next(Writable key, Writable value)
     throws IOException {
-    long pos = in_.getPos();
-    if (pos >= end_)
-      return false;
+    if(gzipped_) {
+      // figure EOS from readLine
+    } else {
+      long pos = in_.getPos();
+      if (pos >= end_)
+        return false;
+    }
 
-    //((LongWritable)key).set(pos);           // key is position
+    //((LongWritable)key).set(pos);      // key is position
     //((UTF8)value).set(readLine(in));   // value is line
-    String line = readLine(in_);
+    String line = readLine(din_);
+    if(line == null) {
+        return false; // for gzipped_
+    }
 
     // key is line up to TAB, value is rest
     final boolean NOVAL = false;
@@ -92,22 +112,32 @@
 
 
   // from TextInputFormat
-  private static String readLine(FSDataInputStream in) throws IOException {
+  private static String readLine(InputStream in) throws IOException {
     StringBuffer buffer = new StringBuffer();
+    boolean over = true;
     while (true) {
 
       int b = in.read();
       if (b == -1)
         break;
-
+      
+      over = false;
       char c = (char)b;              // bug: this assumes eight-bit characters.
       if (c == '\r' || c == '\n')    // TODO || c == '\t' here
         break;
 
       buffer.append(c);
     }
-
-    return buffer.toString();
+    
+    if(over) {
+      return null;
+    } else {
+      return buffer.toString();
+    }
+    
   }
 
+  boolean gzipped_;
+  GZIPInputStream zin_;
+  DataInputStream din_; // GZIP or plain
 }

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java?rev=437848&r1=437847&r2=437848&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java
(original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java
Mon Aug 28 14:36:32 2006
@@ -40,13 +40,13 @@
     numFailed_ = 0;
     // super.in_ ignored, using rin_ instead
   }
-  
-    
+
+
   public synchronized boolean next(Writable key, Writable value)
    throws IOException
-  {         
+  {
     boolean success;
-    do {    
+    do {
       if (!more_) return false;
       success = false;
       try {
@@ -61,7 +61,7 @@
       } catch(IOException io) {
         numFailed_++;
         if(numFailed_ < 100 || numFailed_ % 100 == 0) {
-          err_.println("StreamSequenceRecordReader: numFailed_/numRec_=" 
+          err_.println("StreamSequenceRecordReader: numFailed_/numRec_="
             + numFailed_+ "/" + numRec_);
         }
         io.printStackTrace(err_);
@@ -69,9 +69,9 @@
       }
     } while(!success);
     numRecStats("");
-    return more_;    
+    return more_;
   }
-  
+
 
   public void seekNextRecordBoundary() throws IOException
   {



Mime
View raw message