hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r779939 [1/2] - in /hadoop/core/trunk: ./ src/examples/org/apache/hadoop/examples/ src/mapred/org/apache/hadoop/mapred/lib/ src/mapred/org/apache/hadoop/mapreduce/lib/input/ src/test/mapred/org/apache/hadoop/mapred/lib/ src/test/mapred/org/...
Date Fri, 29 May 2009 11:59:03 GMT
Author: sharad
Date: Fri May 29 11:59:03 2009
New Revision: 779939

URL: http://svn.apache.org/viewvc?rev=779939&view=rev
Log:
HADOOP-5698. Change org.apache.hadoop.examples.MultiFileWordCount to use new mapreduce api. Contributed by Amareshwari Sriramadasu.

Added:
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReader.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileSplit.java
    hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/
    hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/examples/org/apache/hadoop/examples/MultiFileWordCount.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/CombineFileRecordReader.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/CombineFileSplit.java
    hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java
    hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=779939&r1=779938&r2=779939&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri May 29 11:59:03 2009
@@ -408,6 +408,9 @@
     Option.withArgPattern. (Giridharan Kesavan and Sharad Agarwal via 
     sharad)
 
+    HADOOP-5698. Change org.apache.hadoop.examples.MultiFileWordCount to 
+    use new mapreduce api. (Amareshwari Sriramadasu via sharad)
+
   OPTIMIZATIONS
 
     HADOOP-5595. NameNode does not need to run a replicator to choose a

Modified: hadoop/core/trunk/src/examples/org/apache/hadoop/examples/MultiFileWordCount.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/MultiFileWordCount.java?rev=779939&r1=779938&r2=779939&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/org/apache/hadoop/examples/MultiFileWordCount.java (original)
+++ hadoop/core/trunk/src/examples/org/apache/hadoop/examples/MultiFileWordCount.java Fri May 29 11:59:03 2009
@@ -18,14 +18,11 @@
 
 package org.apache.hadoop.examples;
 
-import java.io.BufferedReader;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.util.StringTokenizer;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -33,19 +30,18 @@
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.MultiFileInputFormat;
-import org.apache.hadoop.mapred.MultiFileSplit;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.lib.LongSumReducer;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
+import org.apache.hadoop.util.LineReader;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -99,131 +95,129 @@
 
 
   /**
-   * To use {@link MultiFileInputFormat}, one should extend it, to return a 
-   * (custom) {@link RecordReader}. MultiFileInputFormat uses 
-   * {@link MultiFileSplit}s. 
+   * To use {@link CombineFileInputFormat}, one should extend it, to return a 
+   * (custom) {@link RecordReader}. CombineFileInputFormat uses 
+   * {@link CombineFileSplit}s. 
    */
   public static class MyInputFormat 
-    extends MultiFileInputFormat<WordOffset, Text>  {
+    extends CombineFileInputFormat<WordOffset, Text>  {
 
-    @Override
-    public RecordReader<WordOffset,Text> getRecordReader(InputSplit split
-        , JobConf job, Reporter reporter) throws IOException {
-      return new MultiFileLineRecordReader(job, (MultiFileSplit)split);
+    public RecordReader<WordOffset,Text> createRecordReader(InputSplit split,
+        TaskAttemptContext context) throws IOException {
+      return new CombineFileRecordReader<WordOffset, Text>(
+        (CombineFileSplit)split, context, CombineFileLineRecordReader.class);
     }
   }
 
   /**
-   * RecordReader is responsible from extracting records from the InputSplit. 
-   * This record reader accepts a {@link MultiFileSplit}, which encapsulates several 
-   * files, and no file is divided.
+   * RecordReader is responsible from extracting records from a chunk
+   * of the CombineFileSplit. 
    */
-  public static class MultiFileLineRecordReader 
-    implements RecordReader<WordOffset, Text> {
+  public static class CombineFileLineRecordReader 
+    extends RecordReader<WordOffset, Text> {
 
-    private MultiFileSplit split;
-    private long offset; //total offset read so far;
-    private long totLength;
+    private long startOffset; //offset of the chunk;
+    private long end; //end of the chunk;
+    private long pos; // current pos 
     private FileSystem fs;
-    private int count = 0;
-    private Path[] paths;
+    private Path path;
+    private WordOffset key;
+    private Text value;
     
-    private FSDataInputStream currentStream;
-    private BufferedReader currentReader;
+    private FSDataInputStream fileIn;
+    private LineReader reader;
     
-    public MultiFileLineRecordReader(Configuration conf, MultiFileSplit split)
-      throws IOException {
+    public CombineFileLineRecordReader(CombineFileSplit split,
+        TaskAttemptContext context, Integer index) throws IOException {
       
-      this.split = split;
-      fs = FileSystem.get(conf);
-      this.paths = split.getPaths();
-      this.totLength = split.getLength();
-      this.offset = 0;
+      fs = FileSystem.get(context.getConfiguration());
+      this.path = split.getPath(index);
+      this.startOffset = split.getOffset(index);
+      this.end = startOffset + split.getLength(index);
+      boolean skipFirstLine = false;
       
-      //open the first file
-      Path file = paths[count];
-      currentStream = fs.open(file);
-      currentReader = new BufferedReader(new InputStreamReader(currentStream));
+      //open the file
+      fileIn = fs.open(path);
+      if (startOffset != 0) {
+        skipFirstLine = true;
+        --startOffset;
+        fileIn.seek(startOffset);
+      }
+      reader = new LineReader(fileIn);
+      if (skipFirstLine) {  // skip first line and re-establish "startOffset".
+        startOffset += reader.readLine(new Text(), 0,
+                    (int)Math.min((long)Integer.MAX_VALUE, end - startOffset));
+      }
+      this.pos = startOffset;
     }
 
-    public void close() throws IOException { }
-
-    public long getPos() throws IOException {
-      long currentOffset = currentStream == null ? 0 : currentStream.getPos();
-      return offset + currentOffset;
+    public void initialize(InputSplit split, TaskAttemptContext context)
+        throws IOException, InterruptedException {
     }
 
+    public void close() throws IOException { }
+
     public float getProgress() throws IOException {
-      return ((float)getPos()) / totLength;
+      if (startOffset == end) {
+        return 0.0f;
+      } else {
+        return Math.min(1.0f, (pos - startOffset) / (float)(end - startOffset));
+      }
     }
 
-    public boolean next(WordOffset key, Text value) throws IOException {
-      if(count >= split.getNumPaths())
+    public boolean nextKeyValue() throws IOException {
+      if (key == null) {
+        key = new WordOffset();
+        key.fileName = path.getName();
+      }
+      key.offset = pos;
+      if (value == null) {
+        value = new Text();
+      }
+      int newSize = 0;
+      if (pos < end) {
+        newSize = reader.readLine(value);
+        pos += newSize;
+      }
+      if (newSize == 0) {
+        key = null;
+        value = null;
         return false;
-
-      /* Read from file, fill in key and value, if we reach the end of file,
-       * then open the next file and continue from there until all files are
-       * consumed.  
-       */
-      String line;
-      do {
-        line = currentReader.readLine();
-        if(line == null) {
-          //close the file
-          currentReader.close();
-          offset += split.getLength(count);
-          
-          if(++count >= split.getNumPaths()) //if we are done
-            return false;
-          
-          //open a new file
-          Path file = paths[count];
-          currentStream = fs.open(file);
-          currentReader=new BufferedReader(new InputStreamReader(currentStream));
-          key.fileName = file.getName();
-        }
-      } while(line == null);
-      //update the key and value
-      key.offset = currentStream.getPos();
-      value.set(line);
-      
-      return true;
+      } else {
+        return true;
+      }
     }
 
-    public WordOffset createKey() {
-      WordOffset wo = new WordOffset();
-      wo.fileName = paths[0].toString(); //set as the first file
-      return wo;
+    public WordOffset getCurrentKey() 
+        throws IOException, InterruptedException {
+      return key;
     }
 
-    public Text createValue() {
-      return new Text();
+    public Text getCurrentValue() throws IOException, InterruptedException {
+      return value;
     }
   }
 
   /**
    * This Mapper is similar to the one in {@link WordCount.MapClass}.
    */
-  public static class MapClass extends MapReduceBase
-    implements Mapper<WordOffset, Text, Text, IntWritable> {
-
+  public static class MapClass extends 
+      Mapper<WordOffset, Text, Text, IntWritable> {
     private final static IntWritable one = new IntWritable(1);
     private Text word = new Text();
     
-    public void map(WordOffset key, Text value,
-        OutputCollector<Text, IntWritable> output, Reporter reporter)
-        throws IOException {
+    public void map(WordOffset key, Text value, Context context)
+        throws IOException, InterruptedException {
       
       String line = value.toString();
       StringTokenizer itr = new StringTokenizer(line);
       while (itr.hasMoreTokens()) {
         word.set(itr.nextToken());
-        output.collect(word, one);
+        context.write(word, one);
       }
     }
   }
   
-  
   private void printUsage() {
     System.out.println("Usage : multifilewc <input_dir> <output>" );
   }
@@ -232,14 +226,15 @@
 
     if(args.length < 2) {
       printUsage();
-      return 1;
+      return 2;
     }
 
-    JobConf job = new JobConf(getConf(), MultiFileWordCount.class);
+    Job job = new Job(getConf());
     job.setJobName("MultiFileWordCount");
+    job.setJarByClass(MultiFileWordCount.class);
 
     //set the InputFormat of the job to our InputFormat
-    job.setInputFormat(MyInputFormat.class);
+    job.setInputFormatClass(MyInputFormat.class);
     
     // the keys are words (strings)
     job.setOutputKeyClass(Text.class);
@@ -249,15 +244,13 @@
     //use the defined mapper
     job.setMapperClass(MapClass.class);
     //use the WordCount Reducer
-    job.setCombinerClass(LongSumReducer.class);
-    job.setReducerClass(LongSumReducer.class);
+    job.setCombinerClass(IntSumReducer.class);
+    job.setReducerClass(IntSumReducer.class);
 
     FileInputFormat.addInputPaths(job, args[0]);
     FileOutputFormat.setOutputPath(job, new Path(args[1]));
 
-    JobClient.runJob(job);
-    
-    return 0;
+    return job.waitForCompletion(true) ? 0 : 1;
   }
 
   public static void main(String[] args) throws Exception {

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java?rev=779939&r1=779938&r2=779939&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java Fri May 29 11:59:03 2009
@@ -19,28 +19,14 @@
 package org.apache.hadoop.mapred.lib;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.HashMap;
-import java.util.Set;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.net.NodeBase;
-import org.apache.hadoop.net.NetworkTopology;
 
+import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 /**
  * An abstract {@link org.apache.hadoop.mapred.InputFormat} that returns {@link CombineFileSplit}'s
@@ -60,72 +46,13 @@
  * Subclasses implement {@link org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit, JobConf, Reporter)}
  * to construct <code>RecordReader</code>'s for <code>CombineFileSplit</code>'s.
  * @see CombineFileSplit
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat}
  */
+@Deprecated
 public abstract class CombineFileInputFormat<K, V>
-  extends FileInputFormat<K, V> {
-
-  // ability to limit the size of a single split
-  private long maxSplitSize = 0;
-  private long minSplitSizeNode = 0;
-  private long minSplitSizeRack = 0;
-
-  // A pool of input paths filters. A split cannot have blocks from files
-  // across multiple pools.
-  private ArrayList<MultiPathFilter> pools = new  ArrayList<MultiPathFilter>();
-
-  // mapping from a rack name to the set of Nodes in the rack 
-  private static HashMap<String, Set<String>> rackToNodes = 
-                            new HashMap<String, Set<String>>();
-  /**
-   * Specify the maximum size (in bytes) of each split. Each split is
-   * approximately equal to the specified size.
-   */
-  protected void setMaxSplitSize(long maxSplitSize) {
-    this.maxSplitSize = maxSplitSize;
-  }
-
-  /**
-   * Specify the minimum size (in bytes) of each split per node.
-   * This applies to data that is left over after combining data on a single
-   * node into splits that are of maximum size specified by maxSplitSize.
-   * This leftover data will be combined into its own split if its size
-   * exceeds minSplitSizeNode.
-   */
-  protected void setMinSplitSizeNode(long minSplitSizeNode) {
-    this.minSplitSizeNode = minSplitSizeNode;
-  }
-
-  /**
-   * Specify the minimum size (in bytes) of each split per rack.
-   * This applies to data that is left over after combining data on a single
-   * rack into splits that are of maximum size specified by maxSplitSize.
-   * This leftover data will be combined into its own split if its size
-   * exceeds minSplitSizeRack.
-   */
-  protected void setMinSplitSizeRack(long minSplitSizeRack) {
-    this.minSplitSizeRack = minSplitSizeRack;
-  }
-
-  /**
-   * Create a new pool and add the filters to it.
-   * A split cannot have files from different pools.
-   */
-  protected void createPool(JobConf conf, List<PathFilter> filters) {
-    pools.add(new MultiPathFilter(filters));
-  }
-
-  /**
-   * Create a new pool and add the filters to it. 
-   * A pathname can satisfy any one of the specified filters.
-   * A split cannot have files from different pools.
-   */
-  protected void createPool(JobConf conf, PathFilter... filters) {
-    MultiPathFilter multi = new MultiPathFilter();
-    for (PathFilter f: filters) {
-      multi.add(f);
-    }
-    pools.add(multi);
-  }
+  extends org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat<K, V> 
+  implements InputFormat<K, V>{
 
   /**
    * default constructor
@@ -133,306 +60,9 @@
   public CombineFileInputFormat() {
   }
 
-  @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) 
     throws IOException {
-
-    long minSizeNode = 0;
-    long minSizeRack = 0;
-    long maxSize = 0;
-
-    // the values specified by setxxxSplitSize() takes precedence over the
-    // values that might have been specified in the config
-    if (minSplitSizeNode != 0) {
-      minSizeNode = minSplitSizeNode;
-    } else {
-      minSizeNode = job.getLong("mapred.min.split.size.per.node", 0);
-    }
-    if (minSplitSizeRack != 0) {
-      minSizeRack = minSplitSizeRack;
-    } else {
-      minSizeRack = job.getLong("mapred.min.split.size.per.rack", 0);
-    }
-    if (maxSplitSize != 0) {
-      maxSize = maxSplitSize;
-    } else {
-      maxSize = job.getLong("mapred.max.split.size", 0);
-    }
-    if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
-      throw new IOException("Minimum split size pernode " + minSizeNode +
-                            " cannot be larger than maximum split size " +
-                            maxSize);
-    }
-    if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
-      throw new IOException("Minimum split size per rack" + minSizeRack +
-                            " cannot be larger than maximum split size " +
-                            maxSize);
-    }
-    if (minSizeRack != 0 && minSizeNode > minSizeRack) {
-      throw new IOException("Minimum split size per node" + minSizeNode +
-                            " cannot be smaller than minimum split size per rack " +
-                            minSizeRack);
-    }
-
-    // all the files in input set
-    Path[] paths = FileUtil.stat2Paths(listStatus(job));
-    List<CombineFileSplit> splits = new ArrayList<CombineFileSplit>();
-    if (paths.length == 0) {
-      return splits.toArray(new CombineFileSplit[splits.size()]);    
-    }
-
-    // In one single iteration, process all the paths in a single pool.
-    // Processing one pool at a time ensures that a split contans paths
-    // from a single pool only.
-    for (MultiPathFilter onepool : pools) {
-      ArrayList<Path> myPaths = new ArrayList<Path>();
-      
-      // pick one input path. If it matches all the filters in a pool,
-      // add it to the output set
-      for (int i = 0; i < paths.length; i++) {
-        if (paths[i] == null) {  // already processed
-          continue;
-        }
-        FileSystem fs = paths[i].getFileSystem(job);
-        Path p = new Path(paths[i].toUri().getPath());
-        if (onepool.accept(p)) {
-          myPaths.add(paths[i]); // add it to my output set
-          paths[i] = null;       // already processed
-        }
-      }
-      // create splits for all files in this pool.
-      getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]), 
-                    maxSize, minSizeNode, minSizeRack, splits);
-    }
-
-    // Finally, process all paths that do not belong to any pool.
-    ArrayList<Path> myPaths = new ArrayList<Path>();
-    for (int i = 0; i < paths.length; i++) {
-      if (paths[i] == null) {  // already processed
-        continue;
-      }
-      myPaths.add(paths[i]);
-    }
-    // create splits for all files that are not in any pool.
-    getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]), 
-                  maxSize, minSizeNode, minSizeRack, splits);
-
-    // free up rackToNodes map
-    rackToNodes.clear();
-    return splits.toArray(new CombineFileSplit[splits.size()]);    
-  }
-
-  /**
-   * Return all the splits in the specified set of paths
-   */
-  private void getMoreSplits(JobConf job, Path[] paths, 
-                             long maxSize, long minSizeNode, long minSizeRack,
-                             List<CombineFileSplit> splits)
-    throws IOException {
-
-    // all blocks for all the files in input set
-    OneFileInfo[] files;
-  
-    // mapping from a rack name to the list of blocks it has
-    HashMap<String, List<OneBlockInfo>> rackToBlocks = 
-                              new HashMap<String, List<OneBlockInfo>>();
-
-    // mapping from a block to the nodes on which it has replicas
-    HashMap<OneBlockInfo, String[]> blockToNodes = 
-                              new HashMap<OneBlockInfo, String[]>();
-
-    // mapping from a node to the list of blocks that it contains
-    HashMap<String, List<OneBlockInfo>> nodeToBlocks = 
-                              new HashMap<String, List<OneBlockInfo>>();
-    
-    files = new OneFileInfo[paths.length];
-    if (paths.length == 0) {
-      return; 
-    }
-
-    // populate all the blocks for all files
-    long totLength = 0;
-    for (int i = 0; i < paths.length; i++) {
-      files[i] = new OneFileInfo(paths[i], job, 
-                                 rackToBlocks, blockToNodes, nodeToBlocks);
-      totLength += files[i].getLength();
-    }
-
-    ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
-    ArrayList<String> nodes = new ArrayList<String>();
-    long curSplitSize = 0;
-
-    // process all nodes and create splits that are local
-    // to a node. 
-    for (Iterator<Map.Entry<String, 
-         List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator(); 
-         iter.hasNext();) {
-
-      Map.Entry<String, List<OneBlockInfo>> one = iter.next();
-      nodes.add(one.getKey());
-      List<OneBlockInfo> blocksInNode = one.getValue();
-
-      // for each block, copy it into validBlocks. Delete it from 
-      // blockToNodes so that the same block does not appear in 
-      // two different splits.
-      for (OneBlockInfo oneblock : blocksInNode) {
-        if (blockToNodes.containsKey(oneblock)) {
-          validBlocks.add(oneblock);
-          blockToNodes.remove(oneblock);
-          curSplitSize += oneblock.length;
-
-          // if the accumulated split size exceeds the maximum, then 
-          // create this split.
-          if (maxSize != 0 && curSplitSize >= maxSize) {
-            // create an input split and add it to the splits array
-            addCreatedSplit(job, splits, nodes, validBlocks);
-            curSplitSize = 0;
-            validBlocks.clear();
-          }
-        }
-      }
-      // if there were any blocks left over and their combined size is
-      // larger than minSplitNode, then combine them into one split.
-      // Otherwise add them back to the unprocessed pool. It is likely 
-      // that they will be combined with other blocks from the same rack later on.
-      if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
-        // create an input split and add it to the splits array
-        addCreatedSplit(job, splits, nodes, validBlocks);
-      } else {
-        for (OneBlockInfo oneblock : validBlocks) {
-          blockToNodes.put(oneblock, oneblock.hosts);
-        }
-      }
-      validBlocks.clear();
-      nodes.clear();
-      curSplitSize = 0;
-    }
-
-    // if blocks in a rack are below the specified minimum size, then keep them
-    // in 'overflow'. After the processing of all racks is complete, these overflow
-    // blocks will be combined into splits.
-    ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
-    ArrayList<String> racks = new ArrayList<String>();
-
-    // Process all racks over and over again until there is no more work to do.
-    while (blockToNodes.size() > 0) {
-
-      // Create one split for this rack before moving over to the next rack. 
-      // Come back to this rack after creating a single split for each of the 
-      // remaining racks.
-      // Process one rack location at a time, Combine all possible blocks that
-      // reside on this rack as one split. (constrained by minimum and maximum
-      // split size).
-
-      // iterate over all racks 
-      for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = 
-           rackToBlocks.entrySet().iterator(); iter.hasNext();) {
-
-        Map.Entry<String, List<OneBlockInfo>> one = iter.next();
-        racks.add(one.getKey());
-        List<OneBlockInfo> blocks = one.getValue();
-
-        // for each block, copy it into validBlocks. Delete it from 
-        // blockToNodes so that the same block does not appear in 
-        // two different splits.
-        boolean createdSplit = false;
-        for (OneBlockInfo oneblock : blocks) {
-          if (blockToNodes.containsKey(oneblock)) {
-            validBlocks.add(oneblock);
-            blockToNodes.remove(oneblock);
-            curSplitSize += oneblock.length;
-      
-            // if the accumulated split size exceeds the maximum, then 
-            // create this split.
-            if (maxSize != 0 && curSplitSize >= maxSize) {
-              // create an input split and add it to the splits array
-              addCreatedSplit(job, splits, getHosts(racks), validBlocks);
-              createdSplit = true;
-              break;
-            }
-          }
-        }
-
-        // if we created a split, then just go to the next rack
-        if (createdSplit) {
-          curSplitSize = 0;
-          validBlocks.clear();
-          racks.clear();
-          continue;
-        }
-
-        if (!validBlocks.isEmpty()) {
-          if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
-            // if there is a mimimum size specified, then create a single split
-            // otherwise, store these blocks into overflow data structure
-            addCreatedSplit(job, splits, getHosts(racks), validBlocks);
-          } else {
-            // There were a few blocks in this rack that remained to be processed.
-            // Keep them in 'overflow' block list. These will be combined later.
-            overflowBlocks.addAll(validBlocks);
-          }
-        }
-        curSplitSize = 0;
-        validBlocks.clear();
-        racks.clear();
-      }
-    }
-
-    assert blockToNodes.isEmpty();
-    assert curSplitSize == 0;
-    assert validBlocks.isEmpty();
-    assert racks.isEmpty();
-
-    // Process all overflow blocks
-    for (OneBlockInfo oneblock : overflowBlocks) {
-      validBlocks.add(oneblock);
-      curSplitSize += oneblock.length;
-
-      // This might cause an exiting rack location to be re-added,
-      // but it should be ok.
-      for (int i = 0; i < oneblock.racks.length; i++) {
-        racks.add(oneblock.racks[i]);
-      }
-
-      // if the accumulated split size exceeds the maximum, then 
-      // create this split.
-      if (maxSize != 0 && curSplitSize >= maxSize) {
-        // create an input split and add it to the splits array
-        addCreatedSplit(job, splits, getHosts(racks), validBlocks);
-        curSplitSize = 0;
-        validBlocks.clear();
-        racks.clear();
-      }
-    }
-
-    // Process any remaining blocks, if any.
-    if (!validBlocks.isEmpty()) {
-      addCreatedSplit(job, splits, getHosts(racks), validBlocks);
-    }
-  }
-
-  /**
-   * Create a single split from the list of blocks specified in validBlocks
-   * Add this new split into splitList.
-   */
-  private void addCreatedSplit(JobConf job,
-                               List<CombineFileSplit> splitList, 
-                               List<String> locations, 
-                               ArrayList<OneBlockInfo> validBlocks) {
-    // create an input split
-    Path[] fl = new Path[validBlocks.size()];
-    long[] offset = new long[validBlocks.size()];
-    long[] length = new long[validBlocks.size()];
-    for (int i = 0; i < validBlocks.size(); i++) {
-      fl[i] = validBlocks.get(i).onepath; 
-      offset[i] = validBlocks.get(i).offset;
-      length[i] = validBlocks.get(i).length;
-    }
-
-     // add this split to the list that is returned
-    CombineFileSplit thissplit = new CombineFileSplit(job, fl, offset, 
-                                   length, locations.toArray(new String[0]));
-    splitList.add(thissplit); 
+    return super.getSplits(new Job(job)).toArray(new InputSplit[0]);
   }
 
   /**
@@ -442,171 +72,11 @@
                                       JobConf job, Reporter reporter)
     throws IOException;
 
-  /**
-   * information about one file from the File System
-   */
-  private static class OneFileInfo {
-    private long fileSize;               // size of the file
-    private OneBlockInfo[] blocks;       // all blocks in this file
-
-    OneFileInfo(Path path, JobConf job,
-                HashMap<String, List<OneBlockInfo>> rackToBlocks,
-                HashMap<OneBlockInfo, String[]> blockToNodes,
-                HashMap<String, List<OneBlockInfo>> nodeToBlocks)
-                throws IOException {
-      this.fileSize = 0;
-
-      // get block locations from file system
-      FileSystem fs = path.getFileSystem(job);
-      FileStatus stat = fs.getFileStatus(path);
-      BlockLocation[] locations = fs.getFileBlockLocations(stat, 0, 
-                                                           stat.getLen());
-      // create a list of all block and their locations
-      if (locations == null) {
-        blocks = new OneBlockInfo[0];
-      } else {
-        blocks = new OneBlockInfo[locations.length];
-        for (int i = 0; i < locations.length; i++) {
-           
-          fileSize += locations[i].getLength();
-          OneBlockInfo oneblock =  new OneBlockInfo(path, 
-                                       locations[i].getOffset(), 
-                                       locations[i].getLength(),
-                                       locations[i].getHosts(),
-                                       locations[i].getTopologyPaths()); 
-          blocks[i] = oneblock;
-
-          // add this block to the block --> node locations map
-          blockToNodes.put(oneblock, oneblock.hosts);
-
-          // add this block to the rack --> block map
-          for (int j = 0; j < oneblock.racks.length; j++) {
-            String rack = oneblock.racks[j];
-            List<OneBlockInfo> blklist = rackToBlocks.get(rack);
-            if (blklist == null) {
-              blklist = new ArrayList<OneBlockInfo>();
-              rackToBlocks.put(rack, blklist);
-            }
-            blklist.add(oneblock);
-            // Add this host to rackToNodes map
-            addHostToRack(oneblock.racks[j], oneblock.hosts[j]);
-         }
-
-          // add this block to the node --> block map
-          for (int j = 0; j < oneblock.hosts.length; j++) {
-            String node = oneblock.hosts[j];
-            List<OneBlockInfo> blklist = nodeToBlocks.get(node);
-            if (blklist == null) {
-              blklist = new ArrayList<OneBlockInfo>();
-              nodeToBlocks.put(node, blklist);
-            }
-            blklist.add(oneblock);
-          }
-        }
-      }
-    }
-
-    long getLength() {
-      return fileSize;
-    }
-
-    OneBlockInfo[] getBlocks() {
-      return blocks;
-    }
-  }
-
-  /**
-   * information about one block from the File System
-   */
-  private static class OneBlockInfo {
-    Path onepath;                // name of this file
-    long offset;                 // offset in file
-    long length;                 // length of this block
-    String[] hosts;              // nodes on whch this block resides
-    String[] racks;              // network topology of hosts
-
-    OneBlockInfo(Path path, long offset, long len, 
-                 String[] hosts, String[] topologyPaths) {
-      this.onepath = path;
-      this.offset = offset;
-      this.hosts = hosts;
-      this.length = len;
-      assert (hosts.length == topologyPaths.length ||
-              topologyPaths.length == 0);
-
-      // if the file ystem does not have any rack information, then
-      // use dummy rack location.
-      if (topologyPaths.length == 0) {
-        topologyPaths = new String[hosts.length];
-        for (int i = 0; i < topologyPaths.length; i++) {
-          topologyPaths[i] = (new NodeBase(hosts[i], NetworkTopology.DEFAULT_RACK)).
-                                          toString();
-        }
-      }
-
-      // The topology paths have the host name included as the last 
-      // component. Strip it.
-      this.racks = new String[topologyPaths.length];
-      for (int i = 0; i < topologyPaths.length; i++) {
-        this.racks[i] = (new NodeBase(topologyPaths[i])).getNetworkLocation();
-      }
-    }
+  // abstract method from super class implemented to return null
+  public org.apache.hadoop.mapreduce.RecordReader<K, V> createRecordReader(
+      org.apache.hadoop.mapreduce.InputSplit split,
+      TaskAttemptContext context) throws IOException {
+    return null;
   }
 
-  private static void addHostToRack(String rack, String host) {
-    Set<String> hosts = rackToNodes.get(rack);
-    if (hosts == null) {
-      hosts = new HashSet<String>();
-      rackToNodes.put(rack, hosts);
-    }
-    hosts.add(host);
-  }
-  
-  private static List<String> getHosts(List<String> racks) {
-    List<String> hosts = new ArrayList<String>();
-    for (String rack : racks) {
-      hosts.addAll(rackToNodes.get(rack));
-    }
-    return hosts;
-  }
-  
-  /**
-   * Accept a path only if any one of filters given in the
-   * constructor do. 
-   */
-  private static class MultiPathFilter implements PathFilter {
-    private List<PathFilter> filters;
-
-    public MultiPathFilter() {
-      this.filters = new ArrayList<PathFilter>();
-    }
-
-    public MultiPathFilter(List<PathFilter> filters) {
-      this.filters = filters;
-    }
-
-    public void add(PathFilter one) {
-      filters.add(one);
-    }
-
-    public boolean accept(Path path) {
-      for (PathFilter filter : filters) {
-        if (filter.accept(path)) {
-          return true;
-        }
-      }
-      return false;
-    }
-
-    public String toString() {
-      StringBuffer buf = new StringBuffer();
-      buf.append("[");
-      for (PathFilter f: filters) {
-        buf.append(f);
-        buf.append(",");
-      }
-      buf.append("]");
-      return buf.toString();
-    }
-  }
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/CombineFileRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/CombineFileRecordReader.java?rev=779939&r1=779938&r2=779939&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/CombineFileRecordReader.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/CombineFileRecordReader.java Fri May 29 11:59:03 2009
@@ -19,11 +19,9 @@
 package org.apache.hadoop.mapred.lib;
 
 import java.io.*;
-import java.util.*;
 import java.lang.reflect.*;
 
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.conf.Configuration;
@@ -35,8 +33,10 @@
  * This class allows using different RecordReaders for processing
  * these data chunks from different files.
  * @see CombineFileSplit
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader}
  */
-
+@Deprecated
 public class CombineFileRecordReader<K, V> implements RecordReader<K, V> {
 
   static final Class [] constructorSignature = new Class [] 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/CombineFileSplit.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/CombineFileSplit.java?rev=779939&r1=779938&r2=779939&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/CombineFileSplit.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/CombineFileSplit.java Fri May 29 11:59:03 2009
@@ -18,188 +18,30 @@
 
 package org.apache.hadoop.mapred.lib;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.io.Text;
-
 import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.JobConf;
 
 /**
- * A sub-collection of input files. Unlike {@link org.apache.hadoop.mapred.FileSplit}, 
- * CombineFileSplit * class does not represent a split of a file, but a split of input files 
- * into smaller sets. A split may contain blocks from different file but all 
- * the blocks in the same split are probably local to some rack <br> 
- * CombineFileSplit can be used to implement {@link org.apache.hadoop.mapred.RecordReader}'s, 
- * with reading one record per file.
- * @see org.apache.hadoop.mapred.FileSplit
- * @see CombineFileInputFormat 
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.input.CombineFileSplit}
  */
-public class CombineFileSplit implements InputSplit {
+@Deprecated
+public class CombineFileSplit extends 
+    org.apache.hadoop.mapreduce.lib.input.CombineFileSplit 
+    implements InputSplit {
 
-  private Path[] paths;
-  private long[] startoffset;
-  private long[] lengths;
-  private String[] locations;
-  private long totLength;
   private JobConf job;
 
-  /**
-   * default constructor
-   */
-  public CombineFileSplit() {}
-  public CombineFileSplit(JobConf job, Path[] files, long[] start, 
-                          long[] lengths, String[] locations) {
-    initSplit(job, files, start, lengths, locations);
+  public CombineFileSplit() {
   }
 
   public CombineFileSplit(JobConf job, Path[] files, long[] lengths) {
-    long[] startoffset = new long[files.length];
-    for (int i = 0; i < startoffset.length; i++) {
-      startoffset[i] = 0;
-    }
-    String[] locations = new String[files.length];
-    for (int i = 0; i < locations.length; i++) {
-      locations[i] = "";
-    }
-    initSplit(job, files, startoffset, lengths, locations);
-  }
-  
-  private void initSplit(JobConf job, Path[] files, long[] start, 
-                         long[] lengths, String[] locations) {
+    super(files, lengths);
     this.job = job;
-    this.startoffset = start;
-    this.lengths = lengths;
-    this.paths = files;
-    this.totLength = 0;
-    this.locations = locations;
-    for(long length : lengths) {
-      totLength += length;
-    }
-  }
-
-  /**
-   * Copy constructor
-   */
-  public CombineFileSplit(CombineFileSplit old) throws IOException {
-    this(old.getJob(), old.getPaths(), old.getStartOffsets(),
-         old.getLengths(), old.getLocations());
   }
 
   public JobConf getJob() {
     return job;
   }
-
-  public long getLength() {
-    return totLength;
-  }
-
-  /** Returns an array containing the startoffsets of the files in the split*/ 
-  public long[] getStartOffsets() {
-    return startoffset;
-  }
-  
-  /** Returns an array containing the lengths of the files in the split*/ 
-  public long[] getLengths() {
-    return lengths;
-  }
-
-  /** Returns the start offset of the i<sup>th</sup> Path */
-  public long getOffset(int i) {
-    return startoffset[i];
-  }
-  
-  /** Returns the length of the i<sup>th</sup> Path */
-  public long getLength(int i) {
-    return lengths[i];
-  }
-  
-  /** Returns the number of Paths in the split */
-  public int getNumPaths() {
-    return paths.length;
-  }
-
-  /** Returns the i<sup>th</sup> Path */
-  public Path getPath(int i) {
-    return paths[i];
-  }
-  
-  /** Returns all the Paths in the split */
-  public Path[] getPaths() {
-    return paths;
-  }
-
-  /** Returns all the Paths where this input-split resides */
-  public String[] getLocations() throws IOException {
-    return locations;
-  }
-
-  public void readFields(DataInput in) throws IOException {
-    totLength = in.readLong();
-    int arrLength = in.readInt();
-    lengths = new long[arrLength];
-    for(int i=0; i<arrLength;i++) {
-      lengths[i] = in.readLong();
-    }
-    int filesLength = in.readInt();
-    paths = new Path[filesLength];
-    for(int i=0; i<filesLength;i++) {
-      paths[i] = new Path(Text.readString(in));
-    }
-    arrLength = in.readInt();
-    startoffset = new long[arrLength];
-    for(int i=0; i<arrLength;i++) {
-      startoffset[i] = in.readLong();
-    }
-  }
-
-  public void write(DataOutput out) throws IOException {
-    out.writeLong(totLength);
-    out.writeInt(lengths.length);
-    for(long length : lengths) {
-      out.writeLong(length);
-    }
-    out.writeInt(paths.length);
-    for(Path p : paths) {
-      Text.writeString(out, p.toString());
-    }
-    out.writeInt(startoffset.length);
-    for(long length : startoffset) {
-      out.writeLong(length);
-    }
-  }
-  
-  @Override
- public String toString() {
-    StringBuffer sb = new StringBuffer();
-    for (int i = 0; i < paths.length; i++) {
-      if (i == 0 ) {
-        sb.append("Paths:");
-      }
-      sb.append(paths[i].toUri().getPath() + ":" + startoffset[i] +
-                "+" + lengths[i]);
-      if (i < paths.length -1) {
-        sb.append(",");
-      }
-    }
-    if (locations != null) {
-      String locs = "";
-      StringBuffer locsb = new StringBuffer();
-      for (int i = 0; i < locations.length; i++) {
-        locsb.append(locations[i] + ":");
-      }
-      locs = locsb.toString();
-      sb.append(" Locations:" + locs + "; ");
-    }
-    return sb.toString();
-  }
 }

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java?rev=779939&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java Fri May 29 11:59:03 2009
@@ -0,0 +1,617 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.net.NetworkTopology;
+
+/**
+ * An abstract {@link InputFormat} that returns {@link CombineFileSplit}'s in 
+ * {@link InputFormat#getSplits(JobContext)} method. 
+ * 
+ * Splits are constructed from the files under the input paths. 
+ * A split cannot have files from different pools.
+ * Each split returned may contain blocks from different files.
+ * If a maxSplitSize is specified, then blocks on the same node are
+ * combined to form a single split. Blocks that are left over are
+ * then combined with other blocks in the same rack. 
+ * If maxSplitSize is not specified, then blocks from the same rack
+ * are combined in a single split; no attempt is made to create
+ * node-local splits.
+ * If the maxSplitSize is equal to the block size, then this class
+ * is similar to the default splitting behavior in Hadoop: each
+ * block is a locally processed split.
+ * Subclasses implement 
+ * {@link InputFormat#createRecordReader(InputSplit, TaskAttemptContext)}
+ * to construct <code>RecordReader</code>'s for 
+ * <code>CombineFileSplit</code>'s.
+ * 
+ * @see CombineFileSplit
+ */
+public abstract class CombineFileInputFormat<K, V>
+  extends FileInputFormat<K, V> {
+
+  // ability to limit the size of a single split
+  private long maxSplitSize = 0;
+  private long minSplitSizeNode = 0;
+  private long minSplitSizeRack = 0;
+
+  // A pool of input paths filters. A split cannot have blocks from files
+  // across multiple pools.
+  private ArrayList<MultiPathFilter> pools = new  ArrayList<MultiPathFilter>();
+
+  // mapping from a rack name to the set of Nodes in the rack 
+  private static HashMap<String, Set<String>> rackToNodes = 
+                            new HashMap<String, Set<String>>();
+  /**
+   * Specify the maximum size (in bytes) of each split. Each split is
+   * approximately equal to the specified size.
+   */
+  protected void setMaxSplitSize(long maxSplitSize) {
+    this.maxSplitSize = maxSplitSize;
+  }
+
+  /**
+   * Specify the minimum size (in bytes) of each split per node.
+   * This applies to data that is left over after combining data on a single
+   * node into splits that are of maximum size specified by maxSplitSize.
+   * This leftover data will be combined into its own split if its size
+   * exceeds minSplitSizeNode.
+   */
+  protected void setMinSplitSizeNode(long minSplitSizeNode) {
+    this.minSplitSizeNode = minSplitSizeNode;
+  }
+
+  /**
+   * Specify the minimum size (in bytes) of each split per rack.
+   * This applies to data that is left over after combining data on a single
+   * rack into splits that are of maximum size specified by maxSplitSize.
+   * This leftover data will be combined into its own split if its size
+   * exceeds minSplitSizeRack.
+   */
+  protected void setMinSplitSizeRack(long minSplitSizeRack) {
+    this.minSplitSizeRack = minSplitSizeRack;
+  }
+
+  /**
+   * Create a new pool and add the filters to it.
+   * A split cannot have files from different pools.
+   */
+  protected void createPool(List<PathFilter> filters) {
+    pools.add(new MultiPathFilter(filters));
+  }
+
+  /**
+   * Create a new pool and add the filters to it. 
+   * A pathname can satisfy any one of the specified filters.
+   * A split cannot have files from different pools.
+   */
+  protected void createPool(PathFilter... filters) {
+    MultiPathFilter multi = new MultiPathFilter();
+    for (PathFilter f: filters) {
+      multi.add(f);
+    }
+    pools.add(multi);
+  }
+
+  /**
+   * default constructor
+   */
+  public CombineFileInputFormat() {
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext job) 
+    throws IOException {
+
+    long minSizeNode = 0;
+    long minSizeRack = 0;
+    long maxSize = 0;
+    Configuration conf = job.getConfiguration();
+
+    // the values specified by setxxxSplitSize() takes precedence over the
+    // values that might have been specified in the config
+    if (minSplitSizeNode != 0) {
+      minSizeNode = minSplitSizeNode;
+    } else {
+      minSizeNode = conf.getLong("mapred.min.split.size.per.node", 0);
+    }
+    if (minSplitSizeRack != 0) {
+      minSizeRack = minSplitSizeRack;
+    } else {
+      minSizeRack = conf.getLong("mapred.min.split.size.per.rack", 0);
+    }
+    if (maxSplitSize != 0) {
+      maxSize = maxSplitSize;
+    } else {
+      maxSize = conf.getLong("mapred.max.split.size", 0);
+    }
+    if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
+      throw new IOException("Minimum split size pernode " + minSizeNode +
+                            " cannot be larger than maximum split size " +
+                            maxSize);
+    }
+    if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
+      throw new IOException("Minimum split size per rack" + minSizeRack +
+                            " cannot be larger than maximum split size " +
+                            maxSize);
+    }
+    if (minSizeRack != 0 && minSizeNode > minSizeRack) {
+      throw new IOException("Minimum split size per node" + minSizeNode +
+                            " cannot be smaller than minimum split " +
+                            "size per rack " + minSizeRack);
+    }
+
+    // all the files in input set
+    Path[] paths = FileUtil.stat2Paths(
+                     listStatus(job).toArray(new FileStatus[0]));
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+    if (paths.length == 0) {
+      return splits;    
+    }
+
+    // In one single iteration, process all the paths in a single pool.
+    // Processing one pool at a time ensures that a split contains paths
+    // from a single pool only.
+    for (MultiPathFilter onepool : pools) {
+      ArrayList<Path> myPaths = new ArrayList<Path>();
+      
+      // pick one input path. If it matches all the filters in a pool,
+      // add it to the output set
+      for (int i = 0; i < paths.length; i++) {
+        if (paths[i] == null) {  // already processed
+          continue;
+        }
+        Path p = new Path(paths[i].toUri().getPath());
+        if (onepool.accept(p)) {
+          myPaths.add(paths[i]); // add it to my output set
+          paths[i] = null;       // already processed
+        }
+      }
+      // create splits for all files in this pool.
+      getMoreSplits(conf, myPaths.toArray(new Path[myPaths.size()]), 
+                    maxSize, minSizeNode, minSizeRack, splits);
+    }
+
+    // Finally, process all paths that do not belong to any pool.
+    ArrayList<Path> myPaths = new ArrayList<Path>();
+    for (int i = 0; i < paths.length; i++) {
+      if (paths[i] == null) {  // already processed
+        continue;
+      }
+      myPaths.add(paths[i]);
+    }
+    // create splits for all files that are not in any pool.
+    getMoreSplits(conf, myPaths.toArray(new Path[myPaths.size()]), 
+                  maxSize, minSizeNode, minSizeRack, splits);
+
+    // free up rackToNodes map
+    rackToNodes.clear();
+    return splits;    
+  }
+
+  /**
+   * Return all the splits in the specified set of paths
+   */
+  private void getMoreSplits(Configuration conf, Path[] paths, 
+                             long maxSize, long minSizeNode, long minSizeRack,
+                             List<InputSplit> splits)
+    throws IOException {
+
+    // all blocks for all the files in input set
+    OneFileInfo[] files;
+  
+    // mapping from a rack name to the list of blocks it has
+    HashMap<String, List<OneBlockInfo>> rackToBlocks = 
+                              new HashMap<String, List<OneBlockInfo>>();
+
+    // mapping from a block to the nodes on which it has replicas
+    HashMap<OneBlockInfo, String[]> blockToNodes = 
+                              new HashMap<OneBlockInfo, String[]>();
+
+    // mapping from a node to the list of blocks that it contains
+    HashMap<String, List<OneBlockInfo>> nodeToBlocks = 
+                              new HashMap<String, List<OneBlockInfo>>();
+    
+    files = new OneFileInfo[paths.length];
+    if (paths.length == 0) {
+      return; 
+    }
+
+    // populate all the blocks for all files
+    long totLength = 0;
+    for (int i = 0; i < paths.length; i++) {
+      files[i] = new OneFileInfo(paths[i], conf, 
+                                 rackToBlocks, blockToNodes, nodeToBlocks);
+      totLength += files[i].getLength();
+    }
+
+    ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
+    ArrayList<String> nodes = new ArrayList<String>();
+    long curSplitSize = 0;
+
+    // process all nodes and create splits that are local
+    // to a node. 
+    for (Iterator<Map.Entry<String, 
+         List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator(); 
+         iter.hasNext();) {
+
+      Map.Entry<String, List<OneBlockInfo>> one = iter.next();
+      nodes.add(one.getKey());
+      List<OneBlockInfo> blocksInNode = one.getValue();
+
+      // for each block, copy it into validBlocks. Delete it from 
+      // blockToNodes so that the same block does not appear in 
+      // two different splits.
+      for (OneBlockInfo oneblock : blocksInNode) {
+        if (blockToNodes.containsKey(oneblock)) {
+          validBlocks.add(oneblock);
+          blockToNodes.remove(oneblock);
+          curSplitSize += oneblock.length;
+
+          // if the accumulated split size exceeds the maximum, then 
+          // create this split.
+          if (maxSize != 0 && curSplitSize >= maxSize) {
+            // create an input split and add it to the splits array
+            addCreatedSplit(splits, nodes, validBlocks);
+            curSplitSize = 0;
+            validBlocks.clear();
+          }
+        }
+      }
+      // if there were any blocks left over and their combined size is
+      // larger than minSplitNode, then combine them into one split.
+      // Otherwise add them back to the unprocessed pool. It is likely 
+      // that they will be combined with other blocks from the 
+      // same rack later on.
+      if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
+        // create an input split and add it to the splits array
+        addCreatedSplit(splits, nodes, validBlocks);
+      } else {
+        for (OneBlockInfo oneblock : validBlocks) {
+          blockToNodes.put(oneblock, oneblock.hosts);
+        }
+      }
+      validBlocks.clear();
+      nodes.clear();
+      curSplitSize = 0;
+    }
+
+    // if blocks in a rack are below the specified minimum size, then keep them
+    // in 'overflow'. After the processing of all racks is complete, these 
+    // overflow blocks will be combined into splits.
+    ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
+    ArrayList<String> racks = new ArrayList<String>();
+
+    // Process all racks over and over again until there is no more work to do.
+    while (blockToNodes.size() > 0) {
+
+      // Create one split for this rack before moving over to the next rack. 
+      // Come back to this rack after creating a single split for each of the 
+      // remaining racks.
+      // Process one rack location at a time, Combine all possible blocks that
+      // reside on this rack as one split. (constrained by minimum and maximum
+      // split size).
+
+      // iterate over all racks 
+      for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = 
+           rackToBlocks.entrySet().iterator(); iter.hasNext();) {
+
+        Map.Entry<String, List<OneBlockInfo>> one = iter.next();
+        racks.add(one.getKey());
+        List<OneBlockInfo> blocks = one.getValue();
+
+        // for each block, copy it into validBlocks. Delete it from 
+        // blockToNodes so that the same block does not appear in 
+        // two different splits.
+        boolean createdSplit = false;
+        for (OneBlockInfo oneblock : blocks) {
+          if (blockToNodes.containsKey(oneblock)) {
+            validBlocks.add(oneblock);
+            blockToNodes.remove(oneblock);
+            curSplitSize += oneblock.length;
+      
+            // if the accumulated split size exceeds the maximum, then 
+            // create this split.
+            if (maxSize != 0 && curSplitSize >= maxSize) {
+              // create an input split and add it to the splits array
+              addCreatedSplit(splits, getHosts(racks), validBlocks);
+              createdSplit = true;
+              break;
+            }
+          }
+        }
+
+        // if we created a split, then just go to the next rack
+        if (createdSplit) {
+          curSplitSize = 0;
+          validBlocks.clear();
+          racks.clear();
+          continue;
+        }
+
+        if (!validBlocks.isEmpty()) {
+          if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
+            // if there is a minimum size specified, then create a single split
+            // otherwise, store these blocks into overflow data structure
+            addCreatedSplit(splits, getHosts(racks), validBlocks);
+          } else {
+            // There were a few blocks in this rack that 
+        	// remained to be processed. Keep them in 'overflow' block list. 
+        	// These will be combined later.
+            overflowBlocks.addAll(validBlocks);
+          }
+        }
+        curSplitSize = 0;
+        validBlocks.clear();
+        racks.clear();
+      }
+    }
+
+    assert blockToNodes.isEmpty();
+    assert curSplitSize == 0;
+    assert validBlocks.isEmpty();
+    assert racks.isEmpty();
+
+    // Process all overflow blocks
+    for (OneBlockInfo oneblock : overflowBlocks) {
+      validBlocks.add(oneblock);
+      curSplitSize += oneblock.length;
+
+      // This might cause an exiting rack location to be re-added,
+      // but it should be ok.
+      for (int i = 0; i < oneblock.racks.length; i++) {
+        racks.add(oneblock.racks[i]);
+      }
+
+      // if the accumulated split size exceeds the maximum, then 
+      // create this split.
+      if (maxSize != 0 && curSplitSize >= maxSize) {
+        // create an input split and add it to the splits array
+        addCreatedSplit(splits, getHosts(racks), validBlocks);
+        curSplitSize = 0;
+        validBlocks.clear();
+        racks.clear();
+      }
+    }
+
+    // Process any remaining blocks, if any.
+    if (!validBlocks.isEmpty()) {
+      addCreatedSplit(splits, getHosts(racks), validBlocks);
+    }
+  }
+
+  /**
+   * Create a single split from the list of blocks specified in validBlocks
+   * Add this new split into splitList.
+   */
+  private void addCreatedSplit(List<InputSplit> splitList, 
+                               List<String> locations, 
+                               ArrayList<OneBlockInfo> validBlocks) {
+    // create an input split
+    Path[] fl = new Path[validBlocks.size()];
+    long[] offset = new long[validBlocks.size()];
+    long[] length = new long[validBlocks.size()];
+    for (int i = 0; i < validBlocks.size(); i++) {
+      fl[i] = validBlocks.get(i).onepath; 
+      offset[i] = validBlocks.get(i).offset;
+      length[i] = validBlocks.get(i).length;
+    }
+
+     // add this split to the list that is returned
+    CombineFileSplit thissplit = new CombineFileSplit(fl, offset, 
+                                   length, locations.toArray(new String[0]));
+    splitList.add(thissplit); 
+  }
+
+  /**
+   * This is not implemented yet. 
+   */
+  public abstract RecordReader<K, V> createRecordReader(InputSplit split,
+      TaskAttemptContext context) throws IOException;
+
+  /**
+   * information about one file from the File System
+   */
+  private static class OneFileInfo {
+    private long fileSize;               // size of the file
+    private OneBlockInfo[] blocks;       // all blocks in this file
+
+    OneFileInfo(Path path, Configuration conf,
+                HashMap<String, List<OneBlockInfo>> rackToBlocks,
+                HashMap<OneBlockInfo, String[]> blockToNodes,
+                HashMap<String, List<OneBlockInfo>> nodeToBlocks)
+                throws IOException {
+      this.fileSize = 0;
+
+      // get block locations from file system
+      FileSystem fs = path.getFileSystem(conf);
+      FileStatus stat = fs.getFileStatus(path);
+      BlockLocation[] locations = fs.getFileBlockLocations(stat, 0, 
+                                                           stat.getLen());
+      // create a list of all block and their locations
+      if (locations == null) {
+        blocks = new OneBlockInfo[0];
+      } else {
+        blocks = new OneBlockInfo[locations.length];
+        for (int i = 0; i < locations.length; i++) {
+           
+          fileSize += locations[i].getLength();
+          OneBlockInfo oneblock =  new OneBlockInfo(path, 
+                                       locations[i].getOffset(), 
+                                       locations[i].getLength(),
+                                       locations[i].getHosts(),
+                                       locations[i].getTopologyPaths()); 
+          blocks[i] = oneblock;
+
+          // add this block to the block --> node locations map
+          blockToNodes.put(oneblock, oneblock.hosts);
+
+          // add this block to the rack --> block map
+          for (int j = 0; j < oneblock.racks.length; j++) {
+            String rack = oneblock.racks[j];
+            List<OneBlockInfo> blklist = rackToBlocks.get(rack);
+            if (blklist == null) {
+              blklist = new ArrayList<OneBlockInfo>();
+              rackToBlocks.put(rack, blklist);
+            }
+            blklist.add(oneblock);
+            // Add this host to rackToNodes map
+            addHostToRack(oneblock.racks[j], oneblock.hosts[j]);
+         }
+
+          // add this block to the node --> block map
+          for (int j = 0; j < oneblock.hosts.length; j++) {
+            String node = oneblock.hosts[j];
+            List<OneBlockInfo> blklist = nodeToBlocks.get(node);
+            if (blklist == null) {
+              blklist = new ArrayList<OneBlockInfo>();
+              nodeToBlocks.put(node, blklist);
+            }
+            blklist.add(oneblock);
+          }
+        }
+      }
+    }
+
+    long getLength() {
+      return fileSize;
+    }
+
+    OneBlockInfo[] getBlocks() {
+      return blocks;
+    }
+  }
+
+  /**
+   * information about one block from the File System
+   */
+  private static class OneBlockInfo {
+    Path onepath;                // name of this file
+    long offset;                 // offset in file
+    long length;                 // length of this block
+    String[] hosts;              // nodes on which this block resides
+    String[] racks;              // network topology of hosts
+
+    OneBlockInfo(Path path, long offset, long len, 
+                 String[] hosts, String[] topologyPaths) {
+      this.onepath = path;
+      this.offset = offset;
+      this.hosts = hosts;
+      this.length = len;
+      assert (hosts.length == topologyPaths.length ||
+              topologyPaths.length == 0);
+
+      // if the file system does not have any rack information, then
+      // use dummy rack location.
+      if (topologyPaths.length == 0) {
+        topologyPaths = new String[hosts.length];
+        for (int i = 0; i < topologyPaths.length; i++) {
+          topologyPaths[i] = (new NodeBase(hosts[i], 
+                              NetworkTopology.DEFAULT_RACK)).toString();
+        }
+      }
+
+      // The topology paths have the host name included as the last 
+      // component. Strip it.
+      this.racks = new String[topologyPaths.length];
+      for (int i = 0; i < topologyPaths.length; i++) {
+        this.racks[i] = (new NodeBase(topologyPaths[i])).getNetworkLocation();
+      }
+    }
+  }
+
+  private static void addHostToRack(String rack, String host) {
+    Set<String> hosts = rackToNodes.get(rack);
+    if (hosts == null) {
+      hosts = new HashSet<String>();
+      rackToNodes.put(rack, hosts);
+    }
+    hosts.add(host);
+  }
+  
+  private static List<String> getHosts(List<String> racks) {
+    List<String> hosts = new ArrayList<String>();
+    for (String rack : racks) {
+      hosts.addAll(rackToNodes.get(rack));
+    }
+    return hosts;
+  }
+  
+  /**
+   * Accept a path only if any one of filters given in the
+   * constructor do. 
+   */
+  private static class MultiPathFilter implements PathFilter {
+    private List<PathFilter> filters;
+
+    public MultiPathFilter() {
+      this.filters = new ArrayList<PathFilter>();
+    }
+
+    public MultiPathFilter(List<PathFilter> filters) {
+      this.filters = filters;
+    }
+
+    public void add(PathFilter one) {
+      filters.add(one);
+    }
+
+    public boolean accept(Path path) {
+      for (PathFilter filter : filters) {
+        if (filter.accept(path)) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    public String toString() {
+      StringBuffer buf = new StringBuffer();
+      buf.append("[");
+      for (PathFilter f: filters) {
+        buf.append(f);
+        buf.append(",");
+      }
+      buf.append("]");
+      return buf.toString();
+    }
+  }
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReader.java?rev=779939&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReader.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReader.java Fri May 29 11:59:03 2009
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.*;
+import java.lang.reflect.*;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A generic RecordReader that can hand out different recordReaders
+ * for each chunk in a {@link CombineFileSplit}.
+ * A CombineFileSplit can combine data chunks from multiple files. 
+ * This class allows using different RecordReaders for processing
+ * these data chunks from different files.
+ * @see CombineFileSplit
+ */
+
+public class CombineFileRecordReader<K, V> extends RecordReader<K, V> {
+
+  static final Class [] constructorSignature = new Class [] 
+                                         {CombineFileSplit.class,
+                                          TaskAttemptContext.class,
+                                          Integer.class};
+
+  protected CombineFileSplit split;
+  protected Class<? extends RecordReader<K,V>> rrClass;
+  protected Constructor<? extends RecordReader<K,V>> rrConstructor;
+  protected FileSystem fs;
+  protected TaskAttemptContext context;
+  
+  protected int idx;
+  protected long progress;
+  protected RecordReader<K, V> curReader;
+  
+  public void initialize(InputSplit split,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    this.split = (CombineFileSplit)split;
+    this.context = context;
+  }
+  
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+
+    while ((curReader == null) || !curReader.nextKeyValue()) {
+      if (!initNextRecordReader()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public K getCurrentKey() throws IOException, InterruptedException {
+    return curReader.getCurrentKey();
+  }
+  
+  public V getCurrentValue() throws IOException, InterruptedException {
+    return curReader.getCurrentValue();
+  }
+  
+  public void close() throws IOException {
+    if (curReader != null) {
+      curReader.close();
+      curReader = null;
+    }
+  }
+  
+  /**
+   * return progress based on the amount of data processed so far.
+   */
+  public float getProgress() throws IOException {
+    return Math.min(1.0f,  progress/(float)(split.getLength()));
+  }
+  
+  /**
+   * A generic RecordReader that can hand out different recordReaders
+   * for each chunk in the CombineFileSplit.
+   */
+  public CombineFileRecordReader(CombineFileSplit split,
+                                 TaskAttemptContext context,
+                                 Class<? extends RecordReader<K,V>> rrClass)
+    throws IOException {
+    this.split = split;
+    this.context = context;
+    this.rrClass = rrClass;
+    this.idx = 0;
+    this.curReader = null;
+    this.progress = 0;
+
+    try {
+      rrConstructor = rrClass.getDeclaredConstructor(constructorSignature);
+      rrConstructor.setAccessible(true);
+    } catch (Exception e) {
+      throw new RuntimeException(rrClass.getName() + 
+                                 " does not have valid constructor", e);
+    }
+    initNextRecordReader();
+  }
+  
+  /**
+   * Get the record reader for the next chunk in this CombineFileSplit.
+   */
+  protected boolean initNextRecordReader() throws IOException {
+
+    if (curReader != null) {
+      curReader.close();
+      curReader = null;
+      if (idx > 0) {
+        progress += split.getLength(idx-1);    // done processing so far
+      }
+    }
+
+    // if all chunks have been processed, nothing more to do.
+    if (idx == split.getNumPaths()) {
+      return false;
+    }
+
+    // get a record reader for the idx-th chunk
+    try {
+      curReader =  rrConstructor.newInstance(new Object [] 
+                            {split, context, Integer.valueOf(idx)});
+
+      Configuration conf = context.getConfiguration();
+      // setup some helper config variables.
+      conf.set("map.input.file", split.getPath(idx).toString());
+      conf.setLong("map.input.start", split.getOffset(idx));
+      conf.setLong("map.input.length", split.getLength(idx));
+    } catch (Exception e) {
+      throw new RuntimeException (e);
+    }
+    idx++;
+    return true;
+  }
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileSplit.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileSplit.java?rev=779939&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileSplit.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileSplit.java Fri May 29 11:59:03 2009
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+
+/**
+ * A sub-collection of input files. 
+ * 
+ * Unlike {@link FileSplit}, CombineFileSplit class does not represent 
+ * a split of a file, but a split of input files into smaller sets. 
+ * A split may contain blocks from different file but all 
+ * the blocks in the same split are probably local to some rack <br> 
+ * CombineFileSplit can be used to implement {@link RecordReader}'s, 
+ * with reading one record per file.
+ * 
+ * @see FileSplit
+ * @see CombineFileInputFormat 
+ */
+public class CombineFileSplit extends InputSplit implements Writable {
+
+  private Path[] paths;
+  private long[] startoffset;
+  private long[] lengths;
+  private String[] locations;
+  private long totLength;
+
+  /**
+   * default constructor
+   */
+  public CombineFileSplit() {}
+  public CombineFileSplit(Path[] files, long[] start, 
+                          long[] lengths, String[] locations) {
+    initSplit(files, start, lengths, locations);
+  }
+
+  public CombineFileSplit(Path[] files, long[] lengths) {
+    long[] startoffset = new long[files.length];
+    for (int i = 0; i < startoffset.length; i++) {
+      startoffset[i] = 0;
+    }
+    String[] locations = new String[files.length];
+    for (int i = 0; i < locations.length; i++) {
+      locations[i] = "";
+    }
+    initSplit(files, startoffset, lengths, locations);
+  }
+  
+  private void initSplit(Path[] files, long[] start, 
+                         long[] lengths, String[] locations) {
+    this.startoffset = start;
+    this.lengths = lengths;
+    this.paths = files;
+    this.totLength = 0;
+    this.locations = locations;
+    for(long length : lengths) {
+      totLength += length;
+    }
+  }
+
+  /**
+   * Copy constructor
+   */
+  public CombineFileSplit(CombineFileSplit old) throws IOException {
+    this(old.getPaths(), old.getStartOffsets(),
+         old.getLengths(), old.getLocations());
+  }
+
+  public long getLength() {
+    return totLength;
+  }
+
+  /** Returns an array containing the start offsets of the files in the split*/ 
+  public long[] getStartOffsets() {
+    return startoffset;
+  }
+  
+  /** Returns an array containing the lengths of the files in the split*/ 
+  public long[] getLengths() {
+    return lengths;
+  }
+
+  /** Returns the start offset of the i<sup>th</sup> Path */
+  public long getOffset(int i) {
+    return startoffset[i];
+  }
+  
+  /** Returns the length of the i<sup>th</sup> Path */
+  public long getLength(int i) {
+    return lengths[i];
+  }
+  
+  /** Returns the number of Paths in the split */
+  public int getNumPaths() {
+    return paths.length;
+  }
+
+  /** Returns the i<sup>th</sup> Path */
+  public Path getPath(int i) {
+    return paths[i];
+  }
+  
+  /** Returns all the Paths in the split */
+  public Path[] getPaths() {
+    return paths;
+  }
+
+  /** Returns all the Paths where this input-split resides */
+  public String[] getLocations() throws IOException {
+    return locations;
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    totLength = in.readLong();
+    int arrLength = in.readInt();
+    lengths = new long[arrLength];
+    for(int i=0; i<arrLength;i++) {
+      lengths[i] = in.readLong();
+    }
+    int filesLength = in.readInt();
+    paths = new Path[filesLength];
+    for(int i=0; i<filesLength;i++) {
+      paths[i] = new Path(Text.readString(in));
+    }
+    arrLength = in.readInt();
+    startoffset = new long[arrLength];
+    for(int i=0; i<arrLength;i++) {
+      startoffset[i] = in.readLong();
+    }
+  }
+
+  public void write(DataOutput out) throws IOException {
+    out.writeLong(totLength);
+    out.writeInt(lengths.length);
+    for(long length : lengths) {
+      out.writeLong(length);
+    }
+    out.writeInt(paths.length);
+    for(Path p : paths) {
+      Text.writeString(out, p.toString());
+    }
+    out.writeInt(startoffset.length);
+    for(long length : startoffset) {
+      out.writeLong(length);
+    }
+  }
+  
+  @Override
+ public String toString() {
+    StringBuffer sb = new StringBuffer();
+    for (int i = 0; i < paths.length; i++) {
+      if (i == 0 ) {
+        sb.append("Paths:");
+      }
+      sb.append(paths[i].toUri().getPath() + ":" + startoffset[i] +
+                "+" + lengths[i]);
+      if (i < paths.length -1) {
+        sb.append(",");
+      }
+    }
+    if (locations != null) {
+      String locs = "";
+      StringBuffer locsb = new StringBuffer();
+      for (int i = 0; i < locations.length; i++) {
+        locsb.append(locations[i] + ":");
+      }
+      locs = locsb.toString();
+      sb.append(" Locations:" + locs + "; ");
+    }
+    return sb.toString();
+  }
+}



Mime
View raw message