hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r679845 [2/3] - in /hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce: ./ example/ lib/ lib/input/ lib/map/ lib/output/ lib/partition/ lib/reduce/
Date Fri, 25 Jul 2008 15:57:42 GMT
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskID.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskID.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskID.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskID.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.text.NumberFormat;
+
+/**
+ * TaskID represents the immutable and unique identifier for 
+ * a Map or Reduce Task. Each TaskID encompasses multiple attempts made to
+ * execute the Map or Reduce Task, each of which are uniquely indentified by
+ * their TaskAttemptID.
+ * 
+ * TaskID consists of 3 parts. First part is the {@link JobID}, that this 
+ * TaskInProgress belongs to. Second part of the TaskID is either 'm' or 'r' 
+ * representing whether the task is a map task or a reduce task. 
+ * And the third part is the task number. <br> 
+ * An example TaskID is : 
+ * <code>task_200707121733_0003_m_000005</code> , which represents the
+ * fifth map task in the third job running at the jobtracker 
+ * started at <code>200707121733</code>. 
+ * <p>
+ * Applications should never construct or parse TaskID strings
+ * , but rather use appropriate constructors or {@link #forName(String)} 
+ * method. 
+ * 
+ * @see JobID
+ * @see TaskAttemptID
+ */
+public class TaskID extends ID {
+  private static final String TASK = "task";
+  private static char UNDERSCORE = '_';  
+  private static NumberFormat idFormat = NumberFormat.getInstance();
+  static {
+    idFormat.setGroupingUsed(false);
+    idFormat.setMinimumIntegerDigits(6);
+  }
+  
+  private JobID jobId;
+  private boolean isMap;
+
+  /**
+   * Constructs a TaskID object from given {@link JobID}.  
+   * @param jobId JobID that this tip belongs to 
+   * @param isMap whether the tip is a map 
+   * @param id the tip number
+   */
+  public TaskID(JobID jobId, boolean isMap, int id) {
+    super(id);
+    if(jobId == null) {
+      throw new IllegalArgumentException("jobId cannot be null");
+    }
+    this.jobId = jobId;
+    this.isMap = isMap;
+  }
+  
+  /**
+   * Constructs a TaskInProgressId object from given parts.
+   * @param jtIdentifier jobTracker identifier
+   * @param jobId job number 
+   * @param isMap whether the tip is a map 
+   * @param id the tip number
+   */
+  public TaskID(String jtIdentifier, int jobId, boolean isMap, int id) {
+    this(new JobID(jtIdentifier, jobId), isMap, id);
+  }
+  
+  private TaskID() { }
+  
+  /** Returns the {@link JobID} object that this tip belongs to */
+  public JobID getJobID() {
+    return jobId;
+  }
+  
+  /**Returns whether this TaskID is a map ID */
+  public boolean isMap() {
+    return isMap;
+  }
+  
+  @Override
+  public boolean equals(Object o) {
+    if(o == null)
+      return false;
+    if(o.getClass().equals(TaskID.class)) {
+      TaskID that = (TaskID)o;
+      return this.id==that.id
+        && this.isMap == that.isMap
+        && this.jobId.equals(that.jobId);
+    }
+    else return false;
+  }
+
+  /**Compare TaskInProgressIds by first jobIds, then by tip numbers. Reduces are 
+   * defined as greater then maps.*/
+  @Override
+  public int compareTo(ID o) {
+    TaskID that = (TaskID)o;
+    int jobComp = this.jobId.compareTo(that.jobId);
+    if(jobComp == 0) {
+      if(this.isMap == that.isMap) {
+        return this.id - that.id;
+      }
+      else return this.isMap ? -1 : 1;
+    }
+    else return jobComp;
+  }
+  
+  @Override
+  public String toString() { 
+    StringBuilder builder = new StringBuilder();
+    return builder.append(TASK).append(UNDERSCORE)
+      .append(toStringWOPrefix()).toString();
+  }
+
+  StringBuilder toStringWOPrefix() {
+    StringBuilder builder = new StringBuilder();
+    builder.append(jobId.toStringWOPrefix())
+      .append(isMap ? "_m_" : "_r_");
+    return builder.append(idFormat.format(id));
+  }
+  
+  @Override
+  public int hashCode() {
+    return toStringWOPrefix().toString().hashCode();
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    this.jobId = JobID.read(in);
+    this.isMap = in.readBoolean();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    jobId.write(out);
+    out.writeBoolean(isMap);
+  }
+  
+  public static TaskID read(DataInput in) throws IOException {
+    TaskID tipId = new TaskID();
+    tipId.readFields(in);
+    return tipId;
+  }
+  
+  /** Construct a TaskID object from given string 
+   * @return constructed TaskID object or null if the given String is null
+   * @throws IllegalArgumentException if the given string is malformed
+   */
+  public static TaskID forName(String str) 
+    throws IllegalArgumentException {
+    if(str == null)
+      return null;
+    try {
+      String[] parts = str.split("_");
+      if(parts.length == 5) {
+        if(parts[0].equals(TASK)) {
+          boolean isMap = false;
+          if(parts[3].equals("m")) isMap = true;
+          else if(parts[3].equals("r")) isMap = false;
+          else throw new Exception();
+          return new TaskID(parts[1], Integer.parseInt(parts[2]),
+              isMap, Integer.parseInt(parts[4]));
+        }
+      }
+    }catch (Exception ex) {//fall below
+    }
+    throw new IllegalArgumentException("TaskId string : " + str 
+        + " is not properly formed");
+  }
+  
+  /** 
+   * Returns a regex pattern which matches task IDs. Arguments can 
+   * be given null, in which case that part of the regex will be generic.  
+   * For example to obtain a regex matching <i>the first map task</i> 
+   * of <i>any jobtracker</i>, of <i>any job</i>, we would use :
+   * <pre> 
+   * TaskID.getTaskIDsPattern(null, null, true, 1);
+   * </pre>
+   * which will return :
+   * <pre> "task_[^_]*_[0-9]*_m_000001*" </pre> 
+   * @param jtIdentifier jobTracker identifier, or null
+   * @param jobId job number, or null
+   * @param isMap whether the tip is a map, or null 
+   * @param taskId taskId number, or null
+   * @return a regex pattern matching TaskIDs
+   */
+  public static String getTaskIDsPattern(String jtIdentifier, Integer jobId
+      , Boolean isMap, Integer taskId) {
+    StringBuilder builder = new StringBuilder(TASK).append(UNDERSCORE)
+      .append(getTaskIDsPatternWOPrefix(jtIdentifier, jobId, isMap, taskId));
+    return builder.toString();
+  }
+  
+  static StringBuilder getTaskIDsPatternWOPrefix(String jtIdentifier
+      , Integer jobId, Boolean isMap, Integer taskId) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(JobID.getJobIDsPatternWOPrefix(jtIdentifier, jobId))
+      .append(UNDERSCORE)
+      .append(isMap != null ? (isMap ? "m" : "r") : "(m|r)").append(UNDERSCORE)
+      .append(taskId != null ? idFormat.format(taskId) : "[0-9]*");
+    return builder;
+  }
+  
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskInputOutputContext.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskInputOutputContext.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskInputOutputContext.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskInputOutputContext.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A context object that allows input and output from the task. It is only
+ * supplied to the {@link Mapper} or {@link Reducer}.
+ * @param <KEYIN> the input key type for the task
+ * @param <VALUEIN> the input value type for the task
+ * @param <KEYOUT> the output key type for the task
+ * @param <VALUEOUT> the output value type for the task
+ */
+public abstract class TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
+    extends TaskAttemptContext {
+
+  public TaskInputOutputContext(Configuration conf, TaskAttemptID taskid) {
+    super(conf, taskid);
+  }
+
+  /**
+   * Advance to the next key, returning null if at end.
+   * @param key the key object to read in to, which may be null
+   * @return the key object that was read into
+   */
+  public abstract KEYIN nextKey(KEYIN key
+                                ) throws IOException, InterruptedException;
+  
+  /**
+   * Read the next value. Must be called after nextKey.
+   * @param value the value object to read in to, which may be null
+   * @return the value object that was read into
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract VALUEIN nextValue(VALUEIN value
+                                    ) throws IOException, InterruptedException;
+
+  /**
+   * Generate an output key/value pair.
+   */
+  public abstract void collect(KEYOUT key, VALUEOUT value
+                               ) throws IOException, InterruptedException;
+
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/example/WordCount.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/example/WordCount.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/example/WordCount.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/example/WordCount.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,69 @@
+package org.apache.hadoop.mapreduce.example;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+public class WordCount {
+
+  public static class TokenizerMapper 
+       extends Mapper<Object, Text, Text, IntWritable>{
+    
+    private final static IntWritable one = new IntWritable(1);
+    private Text word = new Text();
+      
+    public void map(Object key, Text value, Context context
+                    ) throws IOException, InterruptedException {
+      StringTokenizer itr = new StringTokenizer(value.toString());
+      while (itr.hasMoreTokens()) {
+        word.set(itr.nextToken());
+        context.collect(word, one);
+      }
+    }
+  }
+  
+  public static class IntSumReducer 
+       extends Reducer<Text,IntWritable,Text,IntWritable> {
+    private IntWritable result = new IntWritable();
+
+    public void reduce(Text key, Iterable<IntWritable> values, 
+                       Context context
+                       ) throws IOException, InterruptedException {
+      int sum = 0;
+      for (IntWritable val : values) {
+        sum += val.get();
+      }
+      result.set(sum);
+      context.collect(key, result);
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration conf = new Configuration();
+    GenericOptionsParser parser = new GenericOptionsParser(conf, args);
+    String[] otherArgs = parser.getRemainingArgs();
+    if (otherArgs.length != 2) {
+      System.err.println("Usage: wordcount <in> <out>");
+      System.exit(2);
+    }
+    Job job = new Job(conf, "word count");
+    job.setMapperClass(TokenizerMapper.class);
+    job.setCombinerClass(IntSumReducer.class);
+    job.setReducerClass(IntSumReducer.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(IntWritable.class);
+    FileInputFormat.addInputPath(conf, new Path(otherArgs[0]));
+    FileOutputFormat.setOutputPath(conf, new Path(otherArgs[1]));
+    System.exit(job.waitForCompletion() ? 0 : 1);
+  }
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,393 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+
+/** 
+ * A base class for file-based {@link InputFormat}s.
+ * 
+ * <p><code>FileInputFormat</code> is the base class for all file-based 
+ * <code>InputFormat</code>s. This provides a generic implementation of
+ * {@link #getSplits(JobContext)}.
+ * Subclasses of <code>FileInputFormat</code> can also override the 
+ * {@link #isSplitable(JobContext, Path)} method to ensure input-files are
+ * not split-up and are processed as a whole by {@link Mapper}s.
+ */
+public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
+
+  public static final Log LOG = LogFactory.getLog(FileInputFormat.class);
+
+  private static final double SPLIT_SLOP = 1.1;   // 10% slop
+
+  private static final PathFilter hiddenFileFilter = new PathFilter(){
+      public boolean accept(Path p){
+        String name = p.getName(); 
+        return !name.startsWith("_") && !name.startsWith("."); 
+      }
+    }; 
+
+  /**
+   * Proxy PathFilter that accepts a path only if all filters given in the
+   * constructor do. Used by the listPaths() to apply the built-in
+   * hiddenFileFilter together with a user provided one (if any).
+   */
+  private static class MultiPathFilter implements PathFilter {
+    private List<PathFilter> filters;
+
+    public MultiPathFilter(List<PathFilter> filters) {
+      this.filters = filters;
+    }
+
+    public boolean accept(Path path) {
+      for (PathFilter filter : filters) {
+        if (!filter.accept(path)) {
+          return false;
+        }
+      }
+      return true;
+    }
+  }
+
+  /**
+   * Get the lower bound on split size imposed by the format.
+   * @return the number of bytes of the minimal split for this format
+   */
+  protected long getFormatMinSplitSize() {
+    return 1;
+  }
+
+  /**
+   * Is the given filename splitable? Usually, true, but if the file is
+   * stream compressed, it will not be.
+   * 
+   * <code>FileInputFormat</code> implementations can override this and return
+   * <code>false</code> to ensure that individual input files are never split-up
+   * so that {@link Mapper}s process entire files.
+   * 
+   * @param context the job context
+   * @param filename the file name to check
+   * @return is this file splitable?
+   */
+  protected boolean isSplitable(JobContext context, Path filename) {
+    return true;
+  }
+
+  /**
+   * Set a PathFilter to be applied to the input paths for the map-reduce job.
+   *
+   * @param filter the PathFilter class use for filtering the input paths.
+   */
+  public static void setInputPathFilter(Configuration conf,
+                                        Class<? extends PathFilter> filter) {
+    conf.setClass("mapred.input.pathFilter.class", filter, PathFilter.class);
+  }
+
+  public static void setMinInputSplitSize(Configuration conf,
+                                          long size) {
+    conf.setLong("mapred.min.split.size", size);
+  }
+
+  public static long getMinSplitSize(Configuration conf) {
+    return conf.getLong("mapred.min.split.size", 1L);
+  }
+
+  public static void setMaxInputSplitSize(Configuration conf,
+                                          long size) {
+    conf.setLong("mapred.max.split.size", size);
+  }
+
+  public static long getMaxSplitSize(Configuration conf) {
+    return conf.getLong("mapred.max.split.size", Long.MAX_VALUE);
+  }
+
+  /**
+   * Get a PathFilter instance of the filter set for the input paths.
+   *
+   * @return the PathFilter instance set for the job, NULL if none has been set.
+   */
+  public static PathFilter getInputPathFilter(Configuration conf) {
+    Class<?> filterClass = conf.getClass("mapred.input.pathFilter.class", null,
+        PathFilter.class);
+    return (filterClass != null) ?
+        (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null;
+  }
+
+  /** List input directories.
+   * Subclasses may override to, e.g., select only files matching a regular
+   * expression. 
+   * 
+   * @param job the job to list input paths for
+   * @return array of FileStatus objects
+   * @throws IOException if zero items.
+   */
+  protected List<FileStatus> listStatus(Configuration job
+                                              ) throws IOException {
+    List<FileStatus> result = new ArrayList<FileStatus>();
+    Path[] dirs = getInputPaths(job);
+    if (dirs.length == 0) {
+      throw new IOException("No input paths specified in job");
+    }
+
+    List<IOException> errors = new ArrayList<IOException>();
+    
+    // creates a MultiPathFilter with the hiddenFileFilter and the
+    // user provided one (if any).
+    List<PathFilter> filters = new ArrayList<PathFilter>();
+    filters.add(hiddenFileFilter);
+    PathFilter jobFilter = getInputPathFilter(job);
+    if (jobFilter != null) {
+      filters.add(jobFilter);
+    }
+    PathFilter inputFilter = new MultiPathFilter(filters);
+    
+    for (int i=0; i < dirs.length; ++i) {
+      Path p = dirs[i];
+      FileSystem fs = p.getFileSystem(job); 
+      FileStatus[] matches = fs.globStatus(p, inputFilter);
+      if (matches == null) {
+        errors.add(new IOException("Input path does not exist: " + p));
+      } else if (matches.length == 0) {
+        errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
+      } else {
+        for (FileStatus globStat: matches) {
+          if (globStat.isDir()) {
+            for(FileStatus stat: fs.listStatus(globStat.getPath(),
+                inputFilter)) {
+              result.add(stat);
+            }          
+          } else {
+            result.add(globStat);
+          }
+        }
+      }
+    }
+
+    if (!errors.isEmpty()) {
+      throw new InvalidInputException(errors);
+    }
+    LOG.info("Total input paths to process : " + result.size()); 
+    return result;
+  }
+  
+
+  /** Splits files returned by {@link #listStatus(Configuration)} when
+   * they're too big.*/ 
+  public List<InputSplit> getSplits(JobContext context
+                                    ) throws IOException {
+    Configuration job = context.getConfiguration();    
+    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
+    long maxSize = getMaxSplitSize(job);
+
+    // generate splits
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+    for (FileStatus file: listStatus(job)) {
+      Path path = file.getPath();
+      FileSystem fs = path.getFileSystem(context.getConfiguration());
+      long length = file.getLen();
+      BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
+      if ((length != 0) && isSplitable(context, path)) { 
+        long blockSize = file.getBlockSize();
+        long splitSize = computeSplitSize(blockSize, minSize, maxSize);
+
+        long bytesRemaining = length;
+        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
+          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
+          splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
+                                   blkLocations[blkIndex].getHosts()));
+          bytesRemaining -= splitSize;
+        }
+        
+        if (bytesRemaining != 0) {
+          splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 
+                     blkLocations[blkLocations.length-1].getHosts()));
+        }
+      } else if (length != 0) {
+        splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
+      } else { 
+        //Create empty hosts array for zero length files
+        splits.add(new FileSplit(path, 0, length, new String[0]));
+      }
+    }
+    LOG.debug("Total # of splits: " + splits.size());
+    return splits;
+  }
+
+  protected long computeSplitSize(long blockSize, long minSize,
+                                  long maxSize) {
+    return Math.max(minSize, Math.min(maxSize, blockSize));
+  }
+
+  protected int getBlockIndex(BlockLocation[] blkLocations, 
+                              long offset) {
+    for (int i = 0 ; i < blkLocations.length; i++) {
+      // is the offset inside this block?
+      if ((blkLocations[i].getOffset() <= offset) &&
+          (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
+        return i;
+      }
+    }
+    BlockLocation last = blkLocations[blkLocations.length -1];
+    long fileLength = last.getOffset() + last.getLength() -1;
+    throw new IllegalArgumentException("Offset " + offset + 
+                                       " is outside of file (0.." +
+                                       fileLength + ")");
+  }
+
+  /**
+   * Sets the given comma separated paths as the list of inputs 
+   * for the map-reduce job.
+   * 
+   * @param conf Configuration of the job
+   * @param commaSeparatedPaths Comma separated paths to be set as 
+   *        the list of inputs for the map-reduce job.
+   */
+  public static void setInputPaths(Configuration conf, 
+                                   String commaSeparatedPaths
+                                   ) throws IOException {
+    setInputPaths(conf, StringUtils.stringToPath(
+                        getPathStrings(commaSeparatedPaths)));
+  }
+
+  /**
+   * Add the given comma separated paths to the list of inputs for
+   *  the map-reduce job.
+   * 
+   * @param conf The configuration of the job 
+   * @param commaSeparatedPaths Comma separated paths to be added to
+   *        the list of inputs for the map-reduce job.
+   */
+  public static void addInputPaths(Configuration conf, 
+                                   String commaSeparatedPaths
+                                   ) throws IOException {
+    for (String str : getPathStrings(commaSeparatedPaths)) {
+      addInputPath(conf, new Path(str));
+    }
+  }
+
+  /**
+   * Set the array of {@link Path}s as the list of inputs
+   * for the map-reduce job.
+   * 
+   * @param conf Configuration of the job. 
+   * @param inputPaths the {@link Path}s of the input directories/files 
+   * for the map-reduce job.
+   */ 
+  public static void setInputPaths(Configuration conf, 
+                                   Path... inputPaths) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    Path path = inputPaths[0].makeQualified(fs);
+    StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
+    for(int i = 1; i < inputPaths.length;i++) {
+      str.append(StringUtils.COMMA_STR);
+      path = inputPaths[i].makeQualified(fs);
+      str.append(StringUtils.escapeString(path.toString()));
+    }
+    conf.set("mapred.input.dir", str.toString());
+  }
+
+  /**
+   * Add a {@link Path} to the list of inputs for the map-reduce job.
+   * 
+   * @param conf The configuration of the job 
+   * @param path {@link Path} to be added to the list of inputs for 
+   *            the map-reduce job.
+   */
+  public static void addInputPath(Configuration conf, 
+                                  Path path) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    path = path.makeQualified(fs);
+    String dirStr = StringUtils.escapeString(path.toString());
+    String dirs = conf.get("mapred.input.dir");
+    conf.set("mapred.input.dir", dirs == null ? dirStr : dirs + "," + dirStr);
+  }
+  
+  // This method escapes commas in the glob pattern of the given paths.
+  private static String[] getPathStrings(String commaSeparatedPaths) {
+    int length = commaSeparatedPaths.length();
+    int curlyOpen = 0;
+    int pathStart = 0;
+    boolean globPattern = false;
+    List<String> pathStrings = new ArrayList<String>();
+    
+    for (int i=0; i<length; i++) {
+      char ch = commaSeparatedPaths.charAt(i);
+      switch(ch) {
+        case '{' : {
+          curlyOpen++;
+          if (!globPattern) {
+            globPattern = true;
+          }
+          break;
+        }
+        case '}' : {
+          curlyOpen--;
+          if (curlyOpen == 0 && globPattern) {
+            globPattern = false;
+          }
+          break;
+        }
+        case ',' : {
+          if (!globPattern) {
+            pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
+            pathStart = i + 1 ;
+          }
+          break;
+        }
+      }
+    }
+    pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
+    
+    return pathStrings.toArray(new String[0]);
+  }
+  
+  /**
+   * Get the list of input {@link Path}s for the map-reduce job.
+   * 
+   * @param conf The configuration of the job 
+   * @return the list of input {@link Path}s for the map-reduce job.
+   */
+  public static Path[] getInputPaths(Configuration conf) {
+    String dirs = conf.get("mapred.input.dir", "");
+    String [] list = StringUtils.split(dirs);
+    Path[] result = new Path[list.length];
+    for (int i = 0; i < list.length; i++) {
+      result[i] = new Path(StringUtils.unEscapeString(list[i]));
+    }
+    return result;
+  }
+
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileSplit.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileSplit.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileSplit.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileSplit.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+
+/** A section of an input file.  Returned by {@link
+ * InputFormat#getSplits(JobContext)} and passed to
+ * {@link InputFormat#createRecordReader(InputSplit,TaskAttemptContext)}. */
+public class FileSplit extends InputSplit {
+  private Path file;
+  private long start;
+  private long length;
+  private String[] hosts;
+
+  FileSplit() {}
+
+  /** Constructs a split with host information
+   *
+   * @param file the file name
+   * @param start the position of the first byte in the file to process
+   * @param length the number of bytes in the file to process
+   * @param hosts the list of hosts containing the block, possibly null
+   */
+  public FileSplit(Path file, long start, long length, String[] hosts) {
+    this.file = file;
+    this.start = start;
+    this.length = length;
+    this.hosts = hosts;
+  }
+ 
+  /** The file containing this split's data. */
+  public Path getPath() { return file; }
+  
+  /** The position of the first byte in the file to process. */
+  public long getStart() { return start; }
+  
+  /** The number of bytes in the file to process. */
+  public long getLength() { return length; }
+
+  public String toString() { return file + ":" + start + "+" + length; }
+
+  ////////////////////////////////////////////
+  // Writable methods
+  ////////////////////////////////////////////
+
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, file.toString());
+    out.writeLong(start);
+    out.writeLong(length);
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    file = new Path(Text.readString(in));
+    start = in.readLong();
+    length = in.readLong();
+    hosts = null;
+  }
+
+  public String[] getLocations() throws IOException {
+    if (this.hosts == null) {
+      return new String[]{};
+    } else {
+      return this.hosts;
+    }
+  }
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/InvalidInputException.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/InvalidInputException.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/InvalidInputException.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/InvalidInputException.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Iterator;
+
+/**
+ * This class wraps a list of problems with the input, so that the user
+ * can get a list of problems together instead of finding and fixing them one 
+ * by one.
+ */
+public class InvalidInputException extends IOException {
+  private List<IOException> problems;
+  
+  /**
+   * Create the exception with the given list.
+   * @param probs the list of problems to report. this list is not copied.
+   */
+  public InvalidInputException(List<IOException> probs) {
+    problems = probs;
+  }
+  
+  /**
+   * Get the complete list of the problems reported.
+   * @return the list of problems, which must not be modified
+   */
+  public List<IOException> getProblems() {
+    return problems;
+  }
+  
+  /**
+   * Get a summary message of the problems found.
+   * @return the concatenated messages from all of the problems.
+   */
+  public String getMessage() {
+    StringBuffer result = new StringBuffer();
+    Iterator<IOException> itr = problems.iterator();
+    while(itr.hasNext()) {
+      result.append(itr.next().getMessage());
+      if (itr.hasNext()) {
+        result.append("\n");
+      }
+    }
+    return result.toString();
+  }
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+
+/**
+ * Treats keys as offset in file and value as line. 
+ */
+public class LineRecordReader extends RecordReader<LongWritable, Text> {
+  private static final Log LOG = LogFactory.getLog(LineRecordReader.class);
+
+  private CompressionCodecFactory compressionCodecs = null;
+  private long start;
+  private long pos;
+  private long end;
+  private LineReader in;
+  int maxLineLength;
+
+  /**
+   * A class that provides a line reader from an input stream.
+   */
+  public static class LineReader {
+    private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+    private int bufferSize = DEFAULT_BUFFER_SIZE;
+    private InputStream in;
+    private byte[] buffer;
+    // the number of bytes of real data in the buffer
+    private int bufferLength = 0;
+    // the current position in the buffer
+    private int bufferPosn = 0;
+
+    /**
+     * Create a line reader that reads from the given stream using the 
+     * given buffer-size.
+     * @param in
+     * @throws IOException
+     */
+    LineReader(InputStream in, int bufferSize) {
+      this.in = in;
+      this.bufferSize = bufferSize;
+      this.buffer = new byte[this.bufferSize];
+    }
+
+    /**
+     * Create a line reader that reads from the given stream using the
+     * <code>io.file.buffer.size</code> specified in the given
+     * <code>Configuration</code>.
+     * @param in input stream
+     * @param conf configuration
+     * @throws IOException
+     */
+    public LineReader(InputStream in, Configuration conf) throws IOException {
+      this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
+    }
+
+    /**
+     * Fill the buffer with more data.
+     * @return was there more data?
+     * @throws IOException
+     */
+    boolean backfill() throws IOException {
+      bufferPosn = 0;
+      bufferLength = in.read(buffer);
+      return bufferLength > 0;
+    }
+    
+    /**
+     * Close the underlying stream.
+     * @throws IOException
+     */
+    public void close() throws IOException {
+      in.close();
+    }
+    
+    /**
+     * Read from the InputStream into the given Text.
+     * @param str the object to store the given line
+     * @param maxLineLength the maximum number of bytes to store into str.
+     * @param maxBytesToConsume the maximum number of bytes to consume in this call.
+     * @return the number of bytes read including the newline
+     * @throws IOException if the underlying stream throws
+     */
+    public int readLine(Text str, int maxLineLength,
+                        int maxBytesToConsume) throws IOException {
+      str.clear();
+      boolean hadFinalNewline = false;
+      boolean hadFinalReturn = false;
+      boolean hitEndOfFile = false;
+      int startPosn = bufferPosn;
+      long bytesConsumed = 0;
+      outerLoop: while (true) {
+        if (bufferPosn >= bufferLength) {
+          if (!backfill()) {
+            hitEndOfFile = true;
+            break;
+          }
+        }
+        startPosn = bufferPosn;
+        for(; bufferPosn < bufferLength; ++bufferPosn) {
+          switch (buffer[bufferPosn]) {
+          case '\n':
+            hadFinalNewline = true;
+            bufferPosn += 1;
+            break outerLoop;
+          case '\r':
+            if (hadFinalReturn) {
+              // leave this \r in the stream, so we'll get it next time
+              break outerLoop;
+            }
+            hadFinalReturn = true;
+            break;
+          default:
+            if (hadFinalReturn) {
+              break outerLoop;
+            }
+          }        
+        }
+        bytesConsumed += bufferPosn - startPosn;
+        int length = bufferPosn - startPosn - (hadFinalReturn ? 1 : 0);
+        length = (int)Math.min(length, maxLineLength - str.getLength());
+        if (length >= 0) {
+          str.append(buffer, startPosn, length);
+        }
+        if (bytesConsumed >= maxBytesToConsume) {
+          return (int)Math.min(bytesConsumed, (long)Integer.MAX_VALUE);
+        }
+      }
+      int newlineLength = (hadFinalNewline ? 1 : 0) + (hadFinalReturn ? 1 : 0);
+      if (!hitEndOfFile) {
+        bytesConsumed += bufferPosn - startPosn;
+        int length = bufferPosn - startPosn - newlineLength;
+        length = (int)Math.min(length, maxLineLength - str.getLength());
+        if (length > 0) {
+          str.append(buffer, startPosn, length);
+        }
+      }
+      return (int)Math.min(bytesConsumed, (long)Integer.MAX_VALUE);
+    }
+
+    /**
+     * Read from the InputStream into the given Text.
+     * @param str the object to store the given line
+     * @param maxLineLength the maximum number of bytes to store into str.
+     * @return the number of bytes read including the newline
+     * @throws IOException if the underlying stream throws
+     */
+    public int readLine(Text str, int maxLineLength) throws IOException {
+      return readLine(str, maxLineLength, Integer.MAX_VALUE);
+  }
+
+    /**
+     * Read from the InputStream into the given Text.
+     * @param str the object to store the given line
+     * @return the number of bytes read including the newline
+     * @throws IOException if the underlying stream throws
+     */
+    public int readLine(Text str) throws IOException {
+      return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
+    }
+
+  }
+
+  public void initialize(InputSplit genericSplit,
+                         TaskAttemptContext context) throws IOException {
+    FileSplit split = (FileSplit) genericSplit;
+    Configuration job = context.getConfiguration();
+    this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
+                                    Integer.MAX_VALUE);
+    start = split.getStart();
+    end = start + split.getLength();
+    final Path file = split.getPath();
+    compressionCodecs = new CompressionCodecFactory(job);
+    final CompressionCodec codec = compressionCodecs.getCodec(file);
+
+    // open the file and seek to the start of the split
+    FileSystem fs = file.getFileSystem(job);
+    FSDataInputStream fileIn = fs.open(split.getPath());
+    boolean skipFirstLine = false;
+    if (codec != null) {
+      in = new LineReader(codec.createInputStream(fileIn), job);
+      end = Long.MAX_VALUE;
+    } else {
+      if (start != 0) {
+        skipFirstLine = true;
+        --start;
+        fileIn.seek(start);
+      }
+      in = new LineReader(fileIn, job);
+    }
+    if (skipFirstLine) {  // skip first line and re-establish "start".
+      start += in.readLine(new Text(), 0,
+                           (int)Math.min((long)Integer.MAX_VALUE, end - start));
+    }
+    this.pos = start;
+  }
+  
+  public LongWritable nextKey(LongWritable key) throws IOException {
+    if (key == null) {
+      key = new LongWritable();
+    }
+    key.set(pos);
+    return key;
+  }
+
+  public Text nextValue(Text value) throws IOException {
+    if (value == null) {
+      value = new Text();
+    }
+    while (pos < end) {
+      int newSize = in.readLine(value, maxLineLength,
+                                Math.max((int)Math.min(Integer.MAX_VALUE, 
+                                                       end-pos),
+                                         maxLineLength));
+      if (newSize == 0) {
+        return null;
+      }
+      pos += newSize;
+      if (newSize < maxLineLength) {
+        break;
+      }
+
+      // line too long. try again
+      LOG.info("Skipped line of size " + newSize + " at pos " + 
+               (pos - newSize));
+    }
+    return value;
+  }
+
+  /**
+   * Get the progress within the split
+   */
+  public float getProgress() {
+    if (start == end) {
+      return 0.0f;
+    } else {
+      return Math.min(1.0f, (pos - start) / (float)(end - start));
+    }
+  }
+  
+  public synchronized void close() throws IOException {
+    if (in != null) {
+      in.close(); 
+    }
+  }
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFormat.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFormat.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFormat.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/** An {@link InputFormat} for {@link SequenceFile}s. */
+public class SequenceFileInputFormat<K, V> extends FileInputFormat<K, V> {
+
+  @Override
+  public RecordReader<K, V> createRecordReader(InputSplit split,
+                                               TaskAttemptContext context
+                                               ) throws IOException {
+    return new SequenceFileRecordReader<K,V>();
+  }
+
+  protected long getFormatMinSplitSize() {
+    return SequenceFile.SYNC_INTERVAL;
+  }
+
+  protected List<FileStatus> listStatus(Configuration job
+                                              )throws IOException {
+
+    List<FileStatus> files = super.listStatus(job);
+    int len = files.size();
+    for(int i=0; i < len; ++i) {
+      FileStatus file = files.get(i);
+      if (file.isDir()) {     // it's a MapFile
+        Path p = file.getPath();
+        FileSystem fs = p.getFileSystem(job);
+        // use the data file
+        files.set(i, fs.getFileStatus(new Path(p, MapFile.DATA_FILE_NAME)));
+      }
+    }
+    return files;
+  }
+}
+

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileRecordReader.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileRecordReader.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileRecordReader.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/** An {@link RecordReader} for {@link SequenceFile}s. */
+public class SequenceFileRecordReader<K, V> extends RecordReader<K, V> {
+  private SequenceFile.Reader in;
+  private long start;
+  private long end;
+  private boolean more = true;
+  protected Configuration conf;
+  
+  @Override
+  public void initialize(InputSplit split, 
+                         TaskAttemptContext context
+                         ) throws IOException, InterruptedException {
+    FileSplit fileSplit = (FileSplit) split;
+    conf = context.getConfiguration();    
+    Path path = fileSplit.getPath();
+    FileSystem fs = path.getFileSystem(conf);
+    this.in = new SequenceFile.Reader(fs, path, conf);
+    this.end = fileSplit.getStart() + fileSplit.getLength();
+
+    if (fileSplit.getStart() > in.getPosition()) {
+      in.sync(fileSplit.getStart());                  // sync to start
+    }
+
+    this.start = in.getPosition();
+    more = start < end;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public K nextKey(K key) throws IOException, InterruptedException {
+    if (!more) {
+      return null;
+    }
+    long pos = in.getPosition();
+    K result = (K) in.next(key);
+    if (result == null || (pos >= end && in.syncSeen())) {
+      more = false;
+      result = null;
+    }
+    return result;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public V nextValue(V value) throws IOException, InterruptedException {
+    return (V) in.getCurrentValue(value);
+  }
+  
+  /**
+   * Return the progress within the input split
+   * @return 0.0 to 1.0 of the input byte range
+   */
+  public float getProgress() throws IOException {
+    if (end == start) {
+      return 0.0f;
+    } else {
+      return Math.min(1.0f, (in.getPosition() - start) / (float)(end - start));
+    }
+  }
+  
+  public synchronized void close() throws IOException { in.close(); }
+  
+}
+

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/** An {@link InputFormat} for plain text files.  Files are broken into lines.
+ * Either linefeed or carriage-return are used to signal end of line.  Keys are
+ * the position in the file, and values are the line of text.. */
+public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
+
+  @Override
+  public RecordReader<LongWritable, Text> 
+    createRecordReader(InputSplit split,
+                       TaskAttemptContext context) {
+    return new LineRecordReader();
+  }
+
+  protected boolean isSplitable(JobContext context, Path file) {
+    CompressionCodec codec = 
+      new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
+    return codec == null;
+  }
+
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/InverseMapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/InverseMapper.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/InverseMapper.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/InverseMapper.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.map;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.Mapper;
+
+/** A {@link Mapper} that swaps keys and values. */
+public class InverseMapper<K, V> extends Mapper<K,V,V,K> {
+
+  /** The inverse function.  Input keys and values are swapped.*/
+  public void map(K key, V value, Context context
+                  ) throws IOException, InterruptedException {
+    context.collect(value, key);
+  }
+  
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.map;
+
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+
+/**
+ * Multithreaded implementation for @link org.apache.hadoop.mapreduce.Mapper.
+ * <p>
+ * It can be used instead of the default implementation,
+ * @link org.apache.hadoop.mapred.MapRunner, when the Map operation is not CPU
+ * bound in order to improve throughput.
+ * <p>
+ * Mapper implementations using this MapRunnable must be thread-safe.
+ * <p>
+ * The Map-Reduce job has to be configured with the mapper to use via 
+ * {@link #setMapperClass(Configuration, Class)} and
+ * the number of thread the thread-pool can use with the
+ * {@link #getNumberOfThreads(Configuration) method. The default
+ * value is 10 threads.
+ * <p>
+ */
+public class MultithreadedMapper<K1, V1, K2, V2> 
+  extends Mapper<K1, V1, K2, V2> {
+
+  private static final Log LOG = LogFactory.getLog(MultithreadedMapper.class);
+  private Class<Mapper<K1,V1,K2,V2>> mapClass;
+  private Context outer;
+  private MapRunner[] runners;
+
+  public static int getNumberOfThreads(Configuration conf) {
+    return conf.getInt("mapred.map.multithreadedrunner.threads", 10);
+  }
+
+  public static void setNumberOfThreads(Configuration conf, int threads) {
+    conf.setInt("mapred.map.multithreadedrunner.threads", threads);
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <K1,V1,K2,V2>
+  Class<Mapper<K1,V1,K2,V2>> getMapperClass(Configuration conf) {
+    return (Class<Mapper<K1,V1,K2,V2>>) 
+           conf.getClass("mapred.map.multithreadedrunner.class",
+                         Mapper.class);
+  }
+  
+  public static <K1,V1,K2,V2> 
+  void setMapperClass(Configuration conf, 
+                      Class<Mapper<K1,V1,K2,V2>> cls) {
+    if (MultithreadedMapper.class.isAssignableFrom(cls)) {
+      throw new IllegalArgumentException("Can't have recursive " + 
+                                         "MultithreadedMapper instances.");
+    }
+    conf.setClass("mapred.map.multithreadedrunner.class", cls, Mapper.class);
+  }
+
+  public void run(Context context) throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+    outer = context;
+    int numberOfThreads = getNumberOfThreads(conf);
+    mapClass = getMapperClass(conf);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Configuring multithread runner to use " + numberOfThreads + 
+                " threads");
+    }
+    
+    runners = (MapRunner[]) new Object[numberOfThreads];
+    for(int i=0; i < numberOfThreads; ++i) {
+      runners[i] = new MapRunner();
+      runners[i].start();
+    }
+    for(int i=0; i < numberOfThreads; ++i) {
+      runners[i].join();
+      Throwable th = runners[i].throwable;
+      if (th != null) {
+        if (th instanceof IOException) {
+          throw (IOException) th;
+        } else if (th instanceof InterruptedException) {
+          throw (InterruptedException) th;
+        } else {
+          throw (RuntimeException) th;
+        }
+      }
+    }
+  }
+
+  private class SubMapContext extends Context {
+    private K1 key;
+    private V1 value;
+    
+    SubMapContext() {
+      super(outer.getConfiguration(), outer.getTaskAttemptId());
+    }
+
+    @Override
+    public InputSplit getInputSplit() {
+      synchronized (outer) {
+        return outer.getInputSplit();
+      }
+    }
+
+    @Override
+    public Counter getCounter(Enum<?> counterName) {
+      synchronized (outer) {
+        return outer.getCounter(counterName);
+      }
+    }
+
+    @Override
+    public Counter getCounter(String groupName, String counterName) {
+      synchronized (outer) {
+        return outer.getCounter(groupName, counterName);
+      }
+    }
+
+    @Override
+    public void progress() {
+      synchronized (outer) {
+        outer.progress();
+      }
+    }
+
+    @Override
+    public void collect(K2 key, V2 value) throws IOException,
+                                         InterruptedException {
+      synchronized (outer) {
+        outer.collect(key, value);
+      }
+    }
+
+    @Override
+    public K1 nextKey(K1 k) throws IOException, InterruptedException {
+      synchronized (outer) {
+        key = outer.nextKey(key);
+        if (key != null) {
+          value = outer.nextValue(value);
+        }
+        return key;
+      }
+    }
+    
+    public V1 nextValue(V1 v) throws IOException, InterruptedException {
+      return value;
+    }
+  }
+
+  private class MapRunner extends Thread {
+    private Mapper<K1,V1,K2,V2> mapper;
+    private Context context;
+    private Throwable throwable;
+
+    @SuppressWarnings("unchecked")
+    MapRunner() {
+      mapper = (Mapper<K1,V1,K2,V2>) 
+        ReflectionUtils.newInstance(mapClass, context.getConfiguration());
+      context = new SubMapContext();
+    }
+
+    public Throwable getThrowable() {
+      return throwable;
+    }
+
+    public void run() {
+      try {
+        mapper.run(context);
+      } catch (Throwable ie) {
+        throwable = ie;
+      }
+    }
+  }
+
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/TokenCounterMapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/TokenCounterMapper.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/TokenCounterMapper.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/TokenCounterMapper.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.map;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+
+public class TokenCounterMapper extends Mapper<Object, Text, Text, IntWritable>{
+    
+  private final static IntWritable one = new IntWritable(1);
+  private Text word = new Text();
+    
+  public void map(Object key, Text value, Context context
+                  ) throws IOException, InterruptedException {
+    StringTokenizer itr = new StringTokenizer(value.toString());
+    while (itr.hasMoreTokens()) {
+      word.set(itr.nextToken());
+      context.collect(word, one);
+    }
+  }
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/** A base class for {@link OutputFormat}s that read from {@link FileSystem}s.*/
+public abstract class FileOutputFormat<K, V> implements OutputFormat<K, V> {
+
+  private static final String TEMP_DIR_NAME = "_temp";
+  /**
+   * Set whether the output of the job is compressed.
+   * @param conf the {@link Configuration} to modify
+   * @param compress should the output of the job be compressed?
+   */
+  public static void setCompressOutput(Configuration conf, boolean compress) {
+    conf.setBoolean("mapred.output.compress", compress);
+  }
+  
+  /**
+   * Is the job output compressed?
+   * @param conf the {@link Configuration} to look in
+   * @return <code>true</code> if the job output should be compressed,
+   *         <code>false</code> otherwise
+   */
+  public static boolean getCompressOutput(Configuration conf) {
+    return conf.getBoolean("mapred.output.compress", false);
+  }
+  
+  /**
+   * Set the {@link CompressionCodec} to be used to compress job outputs.
+   * @param conf the {@link Configuration} to modify
+   * @param codecClass the {@link CompressionCodec} to be used to
+   *                   compress the job outputs
+   */
+  public static void 
+  setOutputCompressorClass(Configuration conf, 
+                           Class<? extends CompressionCodec> codecClass) {
+    setCompressOutput(conf, true);
+    conf.setClass("mapred.output.compression.codec", codecClass, 
+                  CompressionCodec.class);
+  }
+  
+  /**
+   * Get the {@link CompressionCodec} for compressing the job outputs.
+   * @param conf the {@link Configuration} to look in
+   * @param defaultValue the {@link CompressionCodec} to return if not set
+   * @return the {@link CompressionCodec} to be used to compress the 
+   *         job outputs
+   * @throws IllegalArgumentException if the class was specified, but not found
+   */
+  public static Class<? extends CompressionCodec> 
+  getOutputCompressorClass(Configuration conf, 
+		                       Class<? extends CompressionCodec> defaultValue) {
+    Class<? extends CompressionCodec> codecClass = defaultValue;
+    
+    String name = conf.get("mapred.output.compression.codec");
+    if (name != null) {
+      try {
+        codecClass = 
+        	conf.getClassByName(name).asSubclass(CompressionCodec.class);
+      } catch (ClassNotFoundException e) {
+        throw new IllegalArgumentException("Compression codec " + name + 
+                                           " was not found.", e);
+      }
+    }
+    return codecClass;
+  }
+  
+  public abstract 
+    RecordWriter<K, V> getRecordWriter(TaskAttemptContext context
+                                       ) throws IOException;
+
+  public void checkOutputSpecs(JobContext context) 
+    throws FileAlreadyExistsException, 
+           InvalidJobConfException, IOException {
+    // Ensure that the output directory is set and not already there
+    Configuration job = context.getConfiguration();
+    Path outDir = getOutputPath(job);
+    if (outDir == null && context.getNumReduceTasks() != 0) {
+      throw new InvalidJobConfException("Output directory not set in JobConf.");
+    }
+    if (outDir != null && outDir.getFileSystem(job).exists(outDir)) {
+      throw new FileAlreadyExistsException("Output directory " + outDir + 
+                                           " already exists");
+    }
+  }
+
+  /**
+   * Set the {@link Path} of the output directory for the map-reduce job.
+   *
+   * @param conf The configuration of the job.
+   * @param outputDir the {@link Path} of the output directory for 
+   * the map-reduce job.
+   */
+  public static void setOutputPath(Configuration conf, Path outputDir) {
+    conf.set("mapred.output.dir", outputDir.toString());
+  }
+
+  /**
+   * Get the {@link Path} to the output directory for the map-reduce job.
+   * 
+   * @return the {@link Path} to the output directory for the map-reduce job.
+   * @see FileOutputFormat#getWorkOutputPath(Configuration)
+   */
+  public static Path getOutputPath(Configuration conf) {
+    String name = conf.get("mapred.output.dir");
+    return name == null ? null: new Path(name);
+  }
+  
+  /**
+   *  Get the {@link Path} to the task's temporary output directory 
+   *  for the map-reduce job
+   *  
+   * <h4 id="SideEffectFiles">Tasks' Side-Effect Files</h4>
+   * 
+   * <p>Some applications need to create/write-to side-files, which differ from
+   * the actual job-outputs.
+   * 
+   * <p>In such cases there could be issues with 2 instances of the same TIP 
+   * (running simultaneously e.g. speculative tasks) trying to open/write-to the
+   * same file (path) on HDFS. Hence the application-writer will have to pick 
+   * unique names per task-attempt (e.g. using the attemptid, say 
+   * <tt>attempt_200709221812_0001_m_000000_0</tt>), not just per TIP.</p> 
+   * 
+   * <p>To get around this the Map-Reduce framework helps the application-writer 
+   * out by maintaining a special 
+   * <tt>${mapred.output.dir}/_temporary/_${taskid}</tt> 
+   * sub-directory for each task-attempt on HDFS where the output of the 
+   * task-attempt goes. On successful completion of the task-attempt the files 
+   * in the <tt>${mapred.output.dir}/_temporary/_${taskid}</tt> (only) 
+   * are <i>promoted</i> to <tt>${mapred.output.dir}</tt>. Of course, the 
+   * framework discards the sub-directory of unsuccessful task-attempts. This 
+   * is completely transparent to the application.</p>
+   * 
+   * <p>The application-writer can take advantage of this by creating any 
+   * side-files required in <tt>${mapred.work.output.dir}</tt> during execution 
+   * of his reduce-task i.e. via {@link #getWorkOutputPath(Configuration)}, and
+   * the framework will move them out similarly - thus she doesn't have to pick 
+   * unique paths per task-attempt.</p>
+   * 
+   * <p><i>Note</i>: the value of <tt>${mapred.work.output.dir}</tt> during 
+   * execution of a particular task-attempt is actually 
+   * <tt>${mapred.output.dir}/_temporary/_{$taskid}</tt>, and this value is 
+   * set by the map-reduce framework. So, just create any side-files in the 
+   * path  returned by {@link #getWorkOutputPath(Configuration)} from map/reduce 
+   * task to take advantage of this feature.</p>
+   * 
+   * <p>The entire discussion holds true for maps of jobs with 
+   * reducer=NONE (i.e. 0 reduces) since output of the map, in that case, 
+   * goes directly to HDFS.</p> 
+   * 
+   * @return the {@link Path} to the task's temporary output directory 
+   * for the map-reduce job.
+   */
+  public static Path getWorkOutputPath(Configuration conf) {
+    String name = conf.get("mapred.work.output.dir");
+    return name == null ? null: new Path(name);
+  }
+
+  /**
+   * Helper function to create the task's temporary output directory and 
+   * return the path to the task's output file.
+   * 
+   * @param context the task's context
+   * @return path to the task's temporary output file
+   * @throws IOException
+   */
+  protected static Path getTaskOutputPath(TaskAttemptContext context
+                                          ) throws IOException {
+    // ${mapred.job.dir}
+    Configuration conf = context.getConfiguration();
+    Path outputPath = getOutputPath(conf);
+    if (outputPath == null) {
+      throw new IOException("Undefined job output-path");
+    }
+
+    // ${mapred.out.dir}/_temporary
+    Path jobTmpDir = new Path(outputPath, TEMP_DIR_NAME);
+    FileSystem fs = jobTmpDir.getFileSystem(conf);
+    if (!fs.exists(jobTmpDir)) {
+      throw new IOException("The temporary job-output directory " + 
+          jobTmpDir.toString() + " doesn't exist!"); 
+    }
+
+    // ${mapred.out.dir}/_temporary/_${taskid}
+    Path taskTmpDir = getWorkOutputPath(conf);
+    if (!fs.mkdirs(taskTmpDir)) {
+      throw new IOException("Mkdirs failed to create " 
+          + taskTmpDir.toString());
+    }
+    
+    // ${mapred.out.dir}/_temporary/_${taskid}/${name}
+    return new Path(taskTmpDir, getOutputName(context));
+  } 
+
+  /**
+   * Helper function to generate a name that is unique for the task.
+   *
+   * <p>The generated name can be used to create custom files from within the
+   * different tasks for the job, the names for different tasks will not collide
+   * with each other.</p>
+   *
+   * <p>The given name is postfixed with the task type, 'm' for maps, 'r' for
+   * reduces and the task partition number. For example, give a name 'test'
+   * running on the first map o the job the generated name will be
+   * 'test-m-00000'.</p>
+   *
+   * @param conf the configuration for the job.
+   * @param name the name to make unique.
+   * @return a unique name accross all tasks of the job.
+   */
+  public static String getUniqueName(Configuration conf, String name) {
+    int partition = conf.getInt("mapred.task.partition", -1);
+    if (partition == -1) {
+      throw new IllegalArgumentException(
+        "This method can only be called from within a Job");
+    }
+
+    String taskType = (conf.getBoolean("mapred.task.is.map", true)) ? "m" : "r";
+
+    NumberFormat numberFormat = NumberFormat.getInstance();
+    numberFormat.setMinimumIntegerDigits(5);
+    numberFormat.setGroupingUsed(false);
+
+    return name + "-" + taskType + "-" + numberFormat.format(partition);
+  }
+
+  /**
+   * Helper function to generate a {@link Path} for a file that is unique for
+   * the task within the job output directory.
+   *
+   * <p>The path can be used to create custom files from within the map and
+   * reduce tasks. The path name will be unique for each task. The path parent
+   * will be the job output directory.</p>ls
+   *
+   * <p>This method uses the {@link #getUniqueName} method to make the file name
+   * unique for the task.</p>
+   *
+   * @param conf the configuration for the job.
+   * @param name the name for the file.
+   * @return a unique path accross all tasks of the job.
+   */
+  public static Path getPathForCustomFile(Configuration conf, String name) {
+    return new Path(getWorkOutputPath(conf), getUniqueName(conf, name));
+  }
+
+  /** Construct output file names so that, when an output directory listing is
+   * sorted lexicographically, positions correspond to output partitions.*/
+  private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+  static {
+    NUMBER_FORMAT.setMinimumIntegerDigits(5);
+    NUMBER_FORMAT.setGroupingUsed(false);
+  }
+
+  protected static synchronized 
+  String getOutputName(TaskAttemptContext context) {
+    return "part-" + NUMBER_FORMAT.format(context.getTaskAttemptId().getId());
+  }
+}
+

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Consume all outputs and put them in /dev/null. 
+ */
+public class NullOutputFormat<K, V> implements OutputFormat<K, V> {
+  
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) {
+    return new RecordWriter<K, V>(){
+        public void write(K key, V value) { }
+        public void close(TaskAttemptContext context) { }
+      };
+  }
+  
+  public void checkOutputSpecs(JobContext context) { }
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.conf.Configuration;
+
+/** An {@link OutputFormat} that writes {@link SequenceFile}s. */
+public class SequenceFileOutputFormat <K,V> extends FileOutputFormat<K, V> {
+
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+    throws IOException {
+    // get the path of the temporary output file 
+    Path file = FileOutputFormat.getTaskOutputPath(context);
+    Configuration conf = context.getConfiguration();
+    
+    FileSystem fs = file.getFileSystem(conf);
+    CompressionCodec codec = null;
+    CompressionType compressionType = CompressionType.NONE;
+    if (getCompressOutput(conf)) {
+      // find the kind of compression to do
+      compressionType = getOutputCompressionType(conf);
+
+      // find the right codec
+      Class<?> codecClass = getOutputCompressorClass(conf, DefaultCodec.class);
+      codec = (CompressionCodec) 
+        ReflectionUtils.newInstance(codecClass, conf);
+    }
+    final SequenceFile.Writer out = 
+      SequenceFile.createWriter(fs, conf, file,
+                                context.getOutputKeyClass(),
+                                context.getOutputValueClass(),
+                                compressionType,
+                                codec,
+                                context);
+
+    return new RecordWriter<K, V>() {
+
+        public void write(K key, V value)
+          throws IOException {
+
+          out.append(key, value);
+        }
+
+        public void close(TaskAttemptContext context) throws IOException { 
+          out.close();
+        }
+      };
+  }
+
+  /**
+   * Get the {@link CompressionType} for the output {@link SequenceFile}.
+   * @param conf the {@link Configuration}
+   * @return the {@link CompressionType} for the output {@link SequenceFile}, 
+   *         defaulting to {@link CompressionType#RECORD}
+   */
+  public static CompressionType getOutputCompressionType(Configuration conf) {
+    String val = conf.get("mapred.output.compression.type", 
+                          CompressionType.RECORD.toString());
+    return CompressionType.valueOf(val);
+  }
+  
+  /**
+   * Set the {@link CompressionType} for the output {@link SequenceFile}.
+   * @param conf the {@link Configuration} to modify
+   * @param style the {@link CompressionType} for the output
+   *              {@link SequenceFile} 
+   */
+  public static void setOutputCompressionType(Configuration conf, 
+		                                          CompressionType style) {
+    setCompressOutput(conf, true);
+    conf.set("mapred.output.compression.type", style.toString());
+  }
+
+}
+



Mime
View raw message