hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r1235548 [3/8] - in /hadoop/common/branches/branch-1: ./ src/core/org/apache/hadoop/conf/ src/core/org/apache/hadoop/io/ src/mapred/org/apache/hadoop/mapreduce/ src/mapred/org/apache/hadoop/mapreduce/lib/db/ src/mapred/org/apache/hadoop/map...
Date Tue, 24 Jan 2012 23:22:01 GMT
Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,693 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.net.NetworkTopology;
+
+/**
+ * An abstract {@link InputFormat} that returns {@link CombineFileSplit}'s in 
+ * {@link InputFormat#getSplits(JobContext)} method. 
+ * 
+ * Splits are constructed from the files under the input paths. 
+ * A split cannot have files from different pools.
+ * Each split returned may contain blocks from different files.
+ * If a maxSplitSize is specified, then blocks on the same node are
+ * combined to form a single split. Blocks that are left over are
+ * then combined with other blocks in the same rack. 
+ * If maxSplitSize is not specified, then blocks from the same rack
+ * are combined in a single split; no attempt is made to create
+ * node-local splits.
+ * If the maxSplitSize is equal to the block size, then this class
+ * is similar to the default splitting behavior in Hadoop: each
+ * block is a locally processed split.
+ * Subclasses implement 
+ * {@link InputFormat#createRecordReader(InputSplit, TaskAttemptContext)}
+ * to construct <code>RecordReader</code>'s for 
+ * <code>CombineFileSplit</code>'s.
+ * 
+ * @see CombineFileSplit
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public abstract class CombineFileInputFormat<K, V>
+  extends FileInputFormat<K, V> {
+
+  public static final String SPLIT_MINSIZE_PERNODE = 
+    "mapreduce.input.fileinputformat.split.minsize.per.node";
+  public static final String SPLIT_MINSIZE_PERRACK = 
+    "mapreduce.input.fileinputformat.split.minsize.per.rack";
+  // ability to limit the size of a single split
+  private long maxSplitSize = 0;
+  private long minSplitSizeNode = 0;
+  private long minSplitSizeRack = 0;
+
+  // A pool of input paths filters. A split cannot have blocks from files
+  // across multiple pools.
+  private ArrayList<MultiPathFilter> pools = new  ArrayList<MultiPathFilter>();
+
+  // mapping from a rack name to the set of Nodes in the rack 
+  private HashMap<String, Set<String>> rackToNodes = 
+                            new HashMap<String, Set<String>>();
+  /**
+   * Specify the maximum size (in bytes) of each split. Each split is
+   * approximately equal to the specified size.
+   */
+  protected void setMaxSplitSize(long maxSplitSize) {
+    this.maxSplitSize = maxSplitSize;
+  }
+
+  /**
+   * Specify the minimum size (in bytes) of each split per node.
+   * This applies to data that is left over after combining data on a single
+   * node into splits that are of maximum size specified by maxSplitSize.
+   * This leftover data will be combined into its own split if its size
+   * exceeds minSplitSizeNode.
+   */
+  protected void setMinSplitSizeNode(long minSplitSizeNode) {
+    this.minSplitSizeNode = minSplitSizeNode;
+  }
+
+  /**
+   * Specify the minimum size (in bytes) of each split per rack.
+   * This applies to data that is left over after combining data on a single
+   * rack into splits that are of maximum size specified by maxSplitSize.
+   * This leftover data will be combined into its own split if its size
+   * exceeds minSplitSizeRack.
+   */
+  protected void setMinSplitSizeRack(long minSplitSizeRack) {
+    this.minSplitSizeRack = minSplitSizeRack;
+  }
+
+  /**
+   * Create a new pool and add the filters to it.
+   * A split cannot have files from different pools.
+   */
+  protected void createPool(List<PathFilter> filters) {
+    pools.add(new MultiPathFilter(filters));
+  }
+
+  /**
+   * Create a new pool and add the filters to it. 
+   * A pathname can satisfy any one of the specified filters.
+   * A split cannot have files from different pools.
+   */
+  protected void createPool(PathFilter... filters) {
+    MultiPathFilter multi = new MultiPathFilter();
+    for (PathFilter f: filters) {
+      multi.add(f);
+    }
+    pools.add(multi);
+  }
+  
+  @Override
+  protected boolean isSplitable(JobContext context, Path file) {
+    final CompressionCodec codec =
+      new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
+    return codec == null;
+  }
+
+  /**
+   * default constructor
+   */
+  public CombineFileInputFormat() {
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext job) 
+    throws IOException {
+
+    long minSizeNode = 0;
+    long minSizeRack = 0;
+    long maxSize = 0;
+    Configuration conf = job.getConfiguration();
+
+    // the values specified by setxxxSplitSize() takes precedence over the
+    // values that might have been specified in the config
+    if (minSplitSizeNode != 0) {
+      minSizeNode = minSplitSizeNode;
+    } else {
+      minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
+    }
+    if (minSplitSizeRack != 0) {
+      minSizeRack = minSplitSizeRack;
+    } else {
+      minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
+    }
+    if (maxSplitSize != 0) {
+      maxSize = maxSplitSize;
+    } else {
+      maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
+    }
+    if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
+      throw new IOException("Minimum split size pernode " + minSizeNode +
+                            " cannot be larger than maximum split size " +
+                            maxSize);
+    }
+    if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
+      throw new IOException("Minimum split size per rack" + minSizeRack +
+                            " cannot be larger than maximum split size " +
+                            maxSize);
+    }
+    if (minSizeRack != 0 && minSizeNode > minSizeRack) {
+      throw new IOException("Minimum split size per node" + minSizeNode +
+                            " cannot be smaller than minimum split " +
+                            "size per rack " + minSizeRack);
+    }
+
+    // all the files in input set
+    Path[] paths = FileUtil.stat2Paths(
+                     listStatus(job).toArray(new FileStatus[0]));
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+    if (paths.length == 0) {
+      return splits;    
+    }
+
+    // Convert them to Paths first. This is a costly operation and 
+    // we should do it first, otherwise we will incur doing it multiple
+    // times, one time each for each pool in the next loop.
+    List<Path> newpaths = new LinkedList<Path>();
+    for (int i = 0; i < paths.length; i++) {
+      Path p = new Path(paths[i].toUri().getPath());
+      newpaths.add(p);
+    }
+    paths = null;
+
+    // In one single iteration, process all the paths in a single pool.
+    // Processing one pool at a time ensures that a split contains paths
+    // from a single pool only.
+    for (MultiPathFilter onepool : pools) {
+      ArrayList<Path> myPaths = new ArrayList<Path>();
+      
+      // pick one input path. If it matches all the filters in a pool,
+      // add it to the output set
+      for (Iterator<Path> iter = newpaths.iterator(); iter.hasNext();) {
+        Path p = iter.next();
+        if (onepool.accept(p)) {
+          myPaths.add(p); // add it to my output set
+          iter.remove();
+        }
+      }
+      // create splits for all files in this pool.
+      getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]), 
+                    maxSize, minSizeNode, minSizeRack, splits);
+    }
+
+    // create splits for all files that are not in any pool.
+    getMoreSplits(job, newpaths.toArray(new Path[newpaths.size()]), 
+                  maxSize, minSizeNode, minSizeRack, splits);
+
+    // free up rackToNodes map
+    rackToNodes.clear();
+    return splits;    
+  }
+
+  /**
+   * Return all the splits in the specified set of paths
+   */
+  private void getMoreSplits(JobContext job, Path[] paths, 
+                             long maxSize, long minSizeNode, long minSizeRack,
+                             List<InputSplit> splits)
+    throws IOException {
+    Configuration conf = job.getConfiguration();
+
+    // all blocks for all the files in input set
+    OneFileInfo[] files;
+  
+    // mapping from a rack name to the list of blocks it has
+    HashMap<String, List<OneBlockInfo>> rackToBlocks = 
+                              new HashMap<String, List<OneBlockInfo>>();
+
+    // mapping from a block to the nodes on which it has replicas
+    HashMap<OneBlockInfo, String[]> blockToNodes = 
+                              new HashMap<OneBlockInfo, String[]>();
+
+    // mapping from a node to the list of blocks that it contains
+    HashMap<String, List<OneBlockInfo>> nodeToBlocks = 
+                              new HashMap<String, List<OneBlockInfo>>();
+    
+    files = new OneFileInfo[paths.length];
+    if (paths.length == 0) {
+      return; 
+    }
+
+    // populate all the blocks for all files
+    long totLength = 0;
+    for (int i = 0; i < paths.length; i++) {
+      files[i] = new OneFileInfo(paths[i], conf, isSplitable(job, paths[i]),
+                                 rackToBlocks, blockToNodes, nodeToBlocks,
+                                 rackToNodes, maxSize);
+      totLength += files[i].getLength();
+    }
+
+    ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
+    Set<String> nodes = new HashSet<String>();
+    long curSplitSize = 0;
+
+    // process all nodes and create splits that are local
+    // to a node. 
+    for (Iterator<Map.Entry<String, 
+         List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator(); 
+         iter.hasNext();) {
+
+      Map.Entry<String, List<OneBlockInfo>> one = iter.next();
+      nodes.add(one.getKey());
+      List<OneBlockInfo> blocksInNode = one.getValue();
+
+      // for each block, copy it into validBlocks. Delete it from 
+      // blockToNodes so that the same block does not appear in 
+      // two different splits.
+      for (OneBlockInfo oneblock : blocksInNode) {
+        if (blockToNodes.containsKey(oneblock)) {
+          validBlocks.add(oneblock);
+          blockToNodes.remove(oneblock);
+          curSplitSize += oneblock.length;
+
+          // if the accumulated split size exceeds the maximum, then 
+          // create this split.
+          if (maxSize != 0 && curSplitSize >= maxSize) {
+            // create an input split and add it to the splits array
+            addCreatedSplit(splits, nodes, validBlocks);
+            curSplitSize = 0;
+            validBlocks.clear();
+          }
+        }
+      }
+      // if there were any blocks left over and their combined size is
+      // larger than minSplitNode, then combine them into one split.
+      // Otherwise add them back to the unprocessed pool. It is likely 
+      // that they will be combined with other blocks from the 
+      // same rack later on.
+      if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
+        // create an input split and add it to the splits array
+        addCreatedSplit(splits, nodes, validBlocks);
+      } else {
+        for (OneBlockInfo oneblock : validBlocks) {
+          blockToNodes.put(oneblock, oneblock.hosts);
+        }
+      }
+      validBlocks.clear();
+      nodes.clear();
+      curSplitSize = 0;
+    }
+
+    // if blocks in a rack are below the specified minimum size, then keep them
+    // in 'overflow'. After the processing of all racks is complete, these 
+    // overflow blocks will be combined into splits.
+    ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
+    Set<String> racks = new HashSet<String>();
+
+    // Process all racks over and over again until there is no more work to do.
+    while (blockToNodes.size() > 0) {
+
+      // Create one split for this rack before moving over to the next rack. 
+      // Come back to this rack after creating a single split for each of the 
+      // remaining racks.
+      // Process one rack location at a time, Combine all possible blocks that
+      // reside on this rack as one split. (constrained by minimum and maximum
+      // split size).
+
+      // iterate over all racks 
+      for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = 
+           rackToBlocks.entrySet().iterator(); iter.hasNext();) {
+
+        Map.Entry<String, List<OneBlockInfo>> one = iter.next();
+        racks.add(one.getKey());
+        List<OneBlockInfo> blocks = one.getValue();
+
+        // for each block, copy it into validBlocks. Delete it from 
+        // blockToNodes so that the same block does not appear in 
+        // two different splits.
+        boolean createdSplit = false;
+        for (OneBlockInfo oneblock : blocks) {
+          if (blockToNodes.containsKey(oneblock)) {
+            validBlocks.add(oneblock);
+            blockToNodes.remove(oneblock);
+            curSplitSize += oneblock.length;
+      
+            // if the accumulated split size exceeds the maximum, then 
+            // create this split.
+            if (maxSize != 0 && curSplitSize >= maxSize) {
+              // create an input split and add it to the splits array
+              addCreatedSplit(splits, getHosts(racks), validBlocks);
+              createdSplit = true;
+              break;
+            }
+          }
+        }
+
+        // if we created a split, then just go to the next rack
+        if (createdSplit) {
+          curSplitSize = 0;
+          validBlocks.clear();
+          racks.clear();
+          continue;
+        }
+
+        if (!validBlocks.isEmpty()) {
+          if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
+            // if there is a minimum size specified, then create a single split
+            // otherwise, store these blocks into overflow data structure
+            addCreatedSplit(splits, getHosts(racks), validBlocks);
+          } else {
+            // There were a few blocks in this rack that 
+        	// remained to be processed. Keep them in 'overflow' block list. 
+        	// These will be combined later.
+            overflowBlocks.addAll(validBlocks);
+          }
+        }
+        curSplitSize = 0;
+        validBlocks.clear();
+        racks.clear();
+      }
+    }
+
+    assert blockToNodes.isEmpty();
+    assert curSplitSize == 0;
+    assert validBlocks.isEmpty();
+    assert racks.isEmpty();
+
+    // Process all overflow blocks
+    for (OneBlockInfo oneblock : overflowBlocks) {
+      validBlocks.add(oneblock);
+      curSplitSize += oneblock.length;
+
+      // This might cause an exiting rack location to be re-added,
+      // but it should be ok.
+      for (int i = 0; i < oneblock.racks.length; i++) {
+        racks.add(oneblock.racks[i]);
+      }
+
+      // if the accumulated split size exceeds the maximum, then 
+      // create this split.
+      if (maxSize != 0 && curSplitSize >= maxSize) {
+        // create an input split and add it to the splits array
+        addCreatedSplit(splits, getHosts(racks), validBlocks);
+        curSplitSize = 0;
+        validBlocks.clear();
+        racks.clear();
+      }
+    }
+
+    // Process any remaining blocks, if any.
+    if (!validBlocks.isEmpty()) {
+      addCreatedSplit(splits, getHosts(racks), validBlocks);
+    }
+  }
+
+  /**
+   * Create a single split from the list of blocks specified in validBlocks
+   * Add this new split into splitList.
+   */
+  private void addCreatedSplit(List<InputSplit> splitList, 
+                               Collection<String> locations, 
+                               ArrayList<OneBlockInfo> validBlocks) {
+    // create an input split
+    Path[] fl = new Path[validBlocks.size()];
+    long[] offset = new long[validBlocks.size()];
+    long[] length = new long[validBlocks.size()];
+    for (int i = 0; i < validBlocks.size(); i++) {
+      fl[i] = validBlocks.get(i).onepath; 
+      offset[i] = validBlocks.get(i).offset;
+      length[i] = validBlocks.get(i).length;
+    }
+
+     // add this split to the list that is returned
+    CombineFileSplit thissplit = new CombineFileSplit(fl, offset, 
+                                   length, locations.toArray(new String[0]));
+    splitList.add(thissplit); 
+  }
+
+  /**
+   * This is not implemented yet. 
+   */
+  public abstract RecordReader<K, V> createRecordReader(InputSplit split,
+      TaskAttemptContext context) throws IOException;
+
+  /**
+   * information about one file from the File System
+   */
+  private static class OneFileInfo {
+    private long fileSize;               // size of the file
+    private OneBlockInfo[] blocks;       // all blocks in this file
+
+    OneFileInfo(Path path, Configuration conf,
+                boolean isSplitable,
+                HashMap<String, List<OneBlockInfo>> rackToBlocks,
+                HashMap<OneBlockInfo, String[]> blockToNodes,
+                HashMap<String, List<OneBlockInfo>> nodeToBlocks,
+                HashMap<String, Set<String>> rackToNodes,
+                long maxSize)
+                throws IOException {
+      this.fileSize = 0;
+
+      // get block locations from file system
+      FileSystem fs = path.getFileSystem(conf);
+      FileStatus stat = fs.getFileStatus(path);
+      BlockLocation[] locations = fs.getFileBlockLocations(stat, 0, 
+                                                           stat.getLen());
+      // create a list of all block and their locations
+      if (locations == null) {
+        blocks = new OneBlockInfo[0];
+      } else {
+        if (!isSplitable) {
+          // if the file is not splitable, just create the one block with
+          // full file length
+          blocks = new OneBlockInfo[1];
+          fileSize = stat.getLen();
+          blocks[0] = new OneBlockInfo(path, 0, fileSize, locations[0]
+              .getHosts(), locations[0].getTopologyPaths());
+        } else {
+          ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>(
+              locations.length);
+          for (int i = 0; i < locations.length; i++) {
+            fileSize += locations[i].getLength();
+
+            // each split can be a maximum of maxSize
+            long left = locations[i].getLength();
+            long myOffset = locations[i].getOffset();
+            long myLength = 0;
+            while (left > 0) {
+              if (maxSize == 0) {
+                myLength = left;
+              } else {
+                if (left > maxSize && left < 2 * maxSize) {
+                  // if remainder is between max and 2*max - then
+                  // instead of creating splits of size max, left-max we
+                  // create splits of size left/2 and left/2. This is
+                  // a heuristic to avoid creating really really small
+                  // splits.
+                  myLength = left / 2;
+                } else {
+                  myLength = Math.min(maxSize, left);
+                }
+              }
+              OneBlockInfo oneblock = new OneBlockInfo(path, myOffset,
+                  myLength, locations[i].getHosts(), locations[i]
+                      .getTopologyPaths());
+              left -= myLength;
+              myOffset += myLength;
+
+              blocksList.add(oneblock);
+            }
+          }
+          blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
+        }
+
+        for (OneBlockInfo oneblock : blocks) {
+          // add this block to the block --> node locations map
+          blockToNodes.put(oneblock, oneblock.hosts);
+
+          // For blocks that do not have host/rack information,
+          // assign to default  rack.
+          String[] racks = null;
+          if (oneblock.hosts.length == 0) {
+            racks = new String[]{NetworkTopology.DEFAULT_RACK};
+          } else {
+            racks = oneblock.racks;
+          }
+
+          // add this block to the rack --> block map
+          for (int j = 0; j < racks.length; j++) {
+            String rack = racks[j];
+            List<OneBlockInfo> blklist = rackToBlocks.get(rack);
+            if (blklist == null) {
+              blklist = new ArrayList<OneBlockInfo>();
+              rackToBlocks.put(rack, blklist);
+            }
+            blklist.add(oneblock);
+            if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
+              // Add this host to rackToNodes map
+              addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
+            }
+          }
+
+          // add this block to the node --> block map
+          for (int j = 0; j < oneblock.hosts.length; j++) {
+            String node = oneblock.hosts[j];
+            List<OneBlockInfo> blklist = nodeToBlocks.get(node);
+            if (blklist == null) {
+              blklist = new ArrayList<OneBlockInfo>();
+              nodeToBlocks.put(node, blklist);
+            }
+            blklist.add(oneblock);
+          }
+        }
+      }
+    }
+
+    long getLength() {
+      return fileSize;
+    }
+
+    OneBlockInfo[] getBlocks() {
+      return blocks;
+    }
+  }
+
+  /**
+   * information about one block from the File System
+   */
+  private static class OneBlockInfo {
+    Path onepath;                // name of this file
+    long offset;                 // offset in file
+    long length;                 // length of this block
+    String[] hosts;              // nodes on which this block resides
+    String[] racks;              // network topology of hosts
+
+    OneBlockInfo(Path path, long offset, long len, 
+                 String[] hosts, String[] topologyPaths) {
+      this.onepath = path;
+      this.offset = offset;
+      this.hosts = hosts;
+      this.length = len;
+      assert (hosts.length == topologyPaths.length ||
+              topologyPaths.length == 0);
+
+      // if the file system does not have any rack information, then
+      // use dummy rack location.
+      if (topologyPaths.length == 0) {
+        topologyPaths = new String[hosts.length];
+        for (int i = 0; i < topologyPaths.length; i++) {
+          topologyPaths[i] = (new NodeBase(hosts[i], 
+                              NetworkTopology.DEFAULT_RACK)).toString();
+        }
+      }
+
+      // The topology paths have the host name included as the last 
+      // component. Strip it.
+      this.racks = new String[topologyPaths.length];
+      for (int i = 0; i < topologyPaths.length; i++) {
+        this.racks[i] = (new NodeBase(topologyPaths[i])).getNetworkLocation();
+      }
+    }
+  }
+
+  protected BlockLocation[] getFileBlockLocations(
+    FileSystem fs, FileStatus stat) throws IOException {
+    return fs.getFileBlockLocations(stat, 0, stat.getLen());
+  }
+
+  private static void addHostToRack(HashMap<String, Set<String>> rackToNodes,
+                                    String rack, String host) {
+    Set<String> hosts = rackToNodes.get(rack);
+    if (hosts == null) {
+      hosts = new HashSet<String>();
+      rackToNodes.put(rack, hosts);
+    }
+    hosts.add(host);
+  }
+  
+  private Set<String> getHosts(Set<String> racks) {
+    Set<String> hosts = new HashSet<String>();
+    for (String rack : racks) {
+      if (rackToNodes.containsKey(rack)) {
+        hosts.addAll(rackToNodes.get(rack));
+      }
+    }
+    return hosts;
+  }
+  
+  /**
+   * Accept a path only if any one of filters given in the
+   * constructor do. 
+   */
+  private static class MultiPathFilter implements PathFilter {
+    private List<PathFilter> filters;
+
+    public MultiPathFilter() {
+      this.filters = new ArrayList<PathFilter>();
+    }
+
+    public MultiPathFilter(List<PathFilter> filters) {
+      this.filters = filters;
+    }
+
+    public void add(PathFilter one) {
+      filters.add(one);
+    }
+
+    public boolean accept(Path path) {
+      for (PathFilter filter : filters) {
+        if (filter.accept(path)) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    public String toString() {
+      StringBuffer buf = new StringBuffer();
+      buf.append("[");
+      for (PathFilter f: filters) {
+        buf.append(f);
+        buf.append(",");
+      }
+      buf.append("]");
+      return buf.toString();
+    }
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReader.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReader.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReader.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.*;
+import java.lang.reflect.*;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A generic RecordReader that can hand out different recordReaders
+ * for each chunk in a {@link CombineFileSplit}.
+ * A CombineFileSplit can combine data chunks from multiple files. 
+ * This class allows using different RecordReaders for processing
+ * these data chunks from different files.
+ * @see CombineFileSplit
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class CombineFileRecordReader<K, V> extends RecordReader<K, V> {
+
+  static final Class [] constructorSignature = new Class [] 
+                                         {CombineFileSplit.class,
+                                          TaskAttemptContext.class,
+                                          Integer.class};
+
+  protected CombineFileSplit split;
+  protected Class<? extends RecordReader<K,V>> rrClass;
+  protected Constructor<? extends RecordReader<K,V>> rrConstructor;
+  protected FileSystem fs;
+  protected TaskAttemptContext context;
+  
+  protected int idx;
+  protected long progress;
+  protected RecordReader<K, V> curReader;
+  
+  public void initialize(InputSplit split,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    this.split = (CombineFileSplit)split;
+    this.context = context;
+    if (null != this.curReader) {
+      this.curReader.initialize(split, context);
+    }
+  }
+  
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+
+    while ((curReader == null) || !curReader.nextKeyValue()) {
+      if (!initNextRecordReader()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public K getCurrentKey() throws IOException, InterruptedException {
+    return curReader.getCurrentKey();
+  }
+  
+  public V getCurrentValue() throws IOException, InterruptedException {
+    return curReader.getCurrentValue();
+  }
+  
+  public void close() throws IOException {
+    if (curReader != null) {
+      curReader.close();
+      curReader = null;
+    }
+  }
+  
+  /**
+   * return progress based on the amount of data processed so far.
+   */
+  public float getProgress() throws IOException, InterruptedException {
+    long subprogress = 0;    // bytes processed in current split
+    if (null != curReader) {
+      // idx is always one past the current subsplit's true index.
+      subprogress = (long)(curReader.getProgress() * split.getLength(idx - 1));
+    }
+    return Math.min(1.0f,  (progress + subprogress)/(float)(split.getLength()));
+  }
+  
+  /**
+   * A generic RecordReader that can hand out different recordReaders
+   * for each chunk in the CombineFileSplit.
+   */
+  public CombineFileRecordReader(CombineFileSplit split,
+                                 TaskAttemptContext context,
+                                 Class<? extends RecordReader<K,V>> rrClass)
+    throws IOException {
+    this.split = split;
+    this.context = context;
+    this.rrClass = rrClass;
+    this.idx = 0;
+    this.curReader = null;
+    this.progress = 0;
+
+    try {
+      rrConstructor = rrClass.getDeclaredConstructor(constructorSignature);
+      rrConstructor.setAccessible(true);
+    } catch (Exception e) {
+      throw new RuntimeException(rrClass.getName() + 
+                                 " does not have valid constructor", e);
+    }
+    initNextRecordReader();
+  }
+  
+  /**
+   * Get the record reader for the next chunk in this CombineFileSplit.
+   */
+  protected boolean initNextRecordReader() throws IOException {
+
+    if (curReader != null) {
+      curReader.close();
+      curReader = null;
+      if (idx > 0) {
+        progress += split.getLength(idx-1);    // done processing so far
+      }
+    }
+
+    // if all chunks have been processed, nothing more to do.
+    if (idx == split.getNumPaths()) {
+      return false;
+    }
+
+    // get a record reader for the idx-th chunk
+    try {
+      Configuration conf = context.getConfiguration();
+      // setup some helper config variables.
+      conf.set("map.input.file", split.getPath(idx).toString());
+      conf.setLong("map.input.start", split.getOffset(idx));
+      conf.setLong("map.input.length", split.getLength(idx));
+
+      curReader =  rrConstructor.newInstance(new Object [] 
+                            {split, context, Integer.valueOf(idx)});
+
+      if (idx > 0) {
+        // initialize() for the first RecordReader will be called by MapTask;
+        // we're responsible for initializing subsequent RecordReaders.
+        curReader.initialize(split, context);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException (e);
+    }
+    idx++;
+    return true;
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileRecordReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileSplit.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileSplit.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileSplit.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+
+/**
+ * A sub-collection of input files. 
+ * 
+ * Unlike {@link FileSplit}, CombineFileSplit class does not represent 
+ * a split of a file, but a split of input files into smaller sets. 
+ * A split may contain blocks from different file but all 
+ * the blocks in the same split are probably local to some rack <br> 
+ * CombineFileSplit can be used to implement {@link RecordReader}'s, 
+ * with reading one record per file.
+ * 
+ * @see FileSplit
+ * @see CombineFileInputFormat 
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class CombineFileSplit extends InputSplit implements Writable {
+
+  private Path[] paths;
+  private long[] startoffset;
+  private long[] lengths;
+  private String[] locations;
+  private long totLength;
+
+  /**
+   * default constructor
+   */
+  public CombineFileSplit() {}
+  public CombineFileSplit(Path[] files, long[] start, 
+                          long[] lengths, String[] locations) {
+    initSplit(files, start, lengths, locations);
+  }
+
+  public CombineFileSplit(Path[] files, long[] lengths) {
+    long[] startoffset = new long[files.length];
+    for (int i = 0; i < startoffset.length; i++) {
+      startoffset[i] = 0;
+    }
+    String[] locations = new String[files.length];
+    for (int i = 0; i < locations.length; i++) {
+      locations[i] = "";
+    }
+    initSplit(files, startoffset, lengths, locations);
+  }
+  
+  private void initSplit(Path[] files, long[] start, 
+                         long[] lengths, String[] locations) {
+    this.startoffset = start;
+    this.lengths = lengths;
+    this.paths = files;
+    this.totLength = 0;
+    this.locations = locations;
+    for(long length : lengths) {
+      totLength += length;
+    }
+  }
+
+  /**
+   * Copy constructor
+   */
+  public CombineFileSplit(CombineFileSplit old) throws IOException {
+    this(old.getPaths(), old.getStartOffsets(),
+         old.getLengths(), old.getLocations());
+  }
+
+  public long getLength() {
+    return totLength;
+  }
+
+  /** Returns an array containing the start offsets of the files in the split*/ 
+  public long[] getStartOffsets() {
+    return startoffset;
+  }
+  
+  /** Returns an array containing the lengths of the files in the split*/ 
+  public long[] getLengths() {
+    return lengths;
+  }
+
+  /** Returns the start offset of the i<sup>th</sup> Path */
+  public long getOffset(int i) {
+    return startoffset[i];
+  }
+  
+  /** Returns the length of the i<sup>th</sup> Path */
+  public long getLength(int i) {
+    return lengths[i];
+  }
+  
+  /** Returns the number of Paths in the split */
+  public int getNumPaths() {
+    return paths.length;
+  }
+
+  /** Returns the i<sup>th</sup> Path */
+  public Path getPath(int i) {
+    return paths[i];
+  }
+  
+  /** Returns all the Paths in the split */
+  public Path[] getPaths() {
+    return paths;
+  }
+
+  /** Returns all the Paths where this input-split resides */
+  public String[] getLocations() throws IOException {
+    return locations;
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    totLength = in.readLong();
+    int arrLength = in.readInt();
+    lengths = new long[arrLength];
+    for(int i=0; i<arrLength;i++) {
+      lengths[i] = in.readLong();
+    }
+    int filesLength = in.readInt();
+    paths = new Path[filesLength];
+    for(int i=0; i<filesLength;i++) {
+      paths[i] = new Path(Text.readString(in));
+    }
+    arrLength = in.readInt();
+    startoffset = new long[arrLength];
+    for(int i=0; i<arrLength;i++) {
+      startoffset[i] = in.readLong();
+    }
+  }
+
+  public void write(DataOutput out) throws IOException {
+    out.writeLong(totLength);
+    out.writeInt(lengths.length);
+    for(long length : lengths) {
+      out.writeLong(length);
+    }
+    out.writeInt(paths.length);
+    for(Path p : paths) {
+      Text.writeString(out, p.toString());
+    }
+    out.writeInt(startoffset.length);
+    for(long length : startoffset) {
+      out.writeLong(length);
+    }
+  }
+  
+  @Override
+ public String toString() {
+    StringBuffer sb = new StringBuffer();
+    for (int i = 0; i < paths.length; i++) {
+      if (i == 0 ) {
+        sb.append("Paths:");
+      }
+      sb.append(paths[i].toUri().getPath() + ":" + startoffset[i] +
+                "+" + lengths[i]);
+      if (i < paths.length -1) {
+        sb.append(",");
+      }
+    }
+    if (locations != null) {
+      String locs = "";
+      StringBuffer locsb = new StringBuffer();
+      for (int i = 0; i < locations.length; i++) {
+        locsb.append(locations[i] + ":");
+      }
+      locs = locsb.toString();
+      sb.append(" Locations:" + locs + "; ");
+    }
+    return sb.toString();
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/CombineFileSplit.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/DelegatingInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/DelegatingInputFormat.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/DelegatingInputFormat.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/DelegatingInputFormat.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * An {@link InputFormat} that delegates behavior of paths to multiple other
+ * InputFormats.
+ * 
+ * @see MultipleInputs#addInputPath(Job, Path, Class, Class)
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class DelegatingInputFormat<K, V> extends InputFormat<K, V> {
+
+  @SuppressWarnings("unchecked")
+  public List<InputSplit> getSplits(JobContext job) 
+      throws IOException, InterruptedException {
+    Configuration conf = job.getConfiguration();
+    Job jobCopy =new Job(conf);
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+    Map<Path, InputFormat> formatMap = 
+      MultipleInputs.getInputFormatMap(job);
+    Map<Path, Class<? extends Mapper>> mapperMap = MultipleInputs
+       .getMapperTypeMap(job);
+    Map<Class<? extends InputFormat>, List<Path>> formatPaths
+        = new HashMap<Class<? extends InputFormat>, List<Path>>();
+
+    // First, build a map of InputFormats to Paths
+    for (Entry<Path, InputFormat> entry : formatMap.entrySet()) {
+      if (!formatPaths.containsKey(entry.getValue().getClass())) {
+       formatPaths.put(entry.getValue().getClass(), new LinkedList<Path>());
+      }
+
+      formatPaths.get(entry.getValue().getClass()).add(entry.getKey());
+    }
+
+    for (Entry<Class<? extends InputFormat>, List<Path>> formatEntry : 
+        formatPaths.entrySet()) {
+      Class<? extends InputFormat> formatClass = formatEntry.getKey();
+      InputFormat format = (InputFormat) ReflectionUtils.newInstance(
+         formatClass, conf);
+      List<Path> paths = formatEntry.getValue();
+
+      Map<Class<? extends Mapper>, List<Path>> mapperPaths
+          = new HashMap<Class<? extends Mapper>, List<Path>>();
+
+      // Now, for each set of paths that have a common InputFormat, build
+      // a map of Mappers to the paths they're used for
+      for (Path path : paths) {
+       Class<? extends Mapper> mapperClass = mapperMap.get(path);
+       if (!mapperPaths.containsKey(mapperClass)) {
+         mapperPaths.put(mapperClass, new LinkedList<Path>());
+       }
+
+       mapperPaths.get(mapperClass).add(path);
+      }
+
+      // Now each set of paths that has a common InputFormat and Mapper can
+      // be added to the same job, and split together.
+      for (Entry<Class<? extends Mapper>, List<Path>> mapEntry :
+          mapperPaths.entrySet()) {
+       paths = mapEntry.getValue();
+       Class<? extends Mapper> mapperClass = mapEntry.getKey();
+
+       if (mapperClass == null) {
+         try {
+           mapperClass = job.getMapperClass();
+         } catch (ClassNotFoundException e) {
+           throw new IOException("Mapper class is not found", e);
+         }
+       }
+
+       FileInputFormat.setInputPaths(jobCopy, paths.toArray(new Path[paths
+           .size()]));
+
+       // Get splits for each input path and tag with InputFormat
+       // and Mapper types by wrapping in a TaggedInputSplit.
+       List<InputSplit> pathSplits = format.getSplits(jobCopy);
+       for (InputSplit pathSplit : pathSplits) {
+         splits.add(new TaggedInputSplit(pathSplit, conf, format.getClass(),
+             mapperClass));
+       }
+      }
+    }
+
+    return splits;
+  }
+
+  @Override
+  public RecordReader<K, V> createRecordReader(InputSplit split,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    return new DelegatingRecordReader<K, V>(split, context);
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/DelegatingInputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/DelegatingMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/DelegatingMapper.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/DelegatingMapper.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/DelegatingMapper.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * An {@link Mapper} that delegates behavior of paths to multiple other
+ * mappers.
+ * 
+ * @see MultipleInputs#addInputPath(Job, Path, Class, Class)
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class DelegatingMapper<K1, V1, K2, V2> extends Mapper<K1, V1, K2, V2> {
+
+  private Mapper<K1, V1, K2, V2> mapper;
+
+  @SuppressWarnings("unchecked")
+  protected void setup(Context context)
+      throws IOException, InterruptedException {
+    // Find the Mapper from the TaggedInputSplit.
+    TaggedInputSplit inputSplit = (TaggedInputSplit) context.getInputSplit();
+    mapper = (Mapper<K1, V1, K2, V2>) ReflectionUtils.newInstance(inputSplit
+       .getMapperClass(), context.getConfiguration());
+    
+  }
+
+  @SuppressWarnings("unchecked")
+  public void run(Context context) 
+      throws IOException, InterruptedException {
+    setup(context);
+    mapper.run(context);
+    cleanup(context);
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/DelegatingMapper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/DelegatingRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/DelegatingRecordReader.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/DelegatingRecordReader.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/DelegatingRecordReader.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * This is a delegating RecordReader, which delegates the functionality to the
+ * underlying record reader in {@link TaggedInputSplit}  
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class DelegatingRecordReader<K, V> extends RecordReader<K, V> {
+  RecordReader<K, V> originalRR;
+
+  /**
+   * Constructs the DelegatingRecordReader.
+   * 
+   * @param split TaggegInputSplit object
+   * @param context TaskAttemptContext object
+   *  
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @SuppressWarnings("unchecked")
+  public DelegatingRecordReader(InputSplit split, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    // Find the InputFormat and then the RecordReader from the
+    // TaggedInputSplit.
+    TaggedInputSplit taggedInputSplit = (TaggedInputSplit) split;
+    InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils
+        .newInstance(taggedInputSplit.getInputFormatClass(), context
+            .getConfiguration());
+    originalRR = inputFormat.createRecordReader(taggedInputSplit
+        .getInputSplit(), context);
+  }
+
+  @Override
+  public void close() throws IOException {
+    originalRR.close();
+  }
+
+  @Override
+  public K getCurrentKey() throws IOException, InterruptedException {
+    return originalRR.getCurrentKey();
+  }
+
+  @Override
+  public V getCurrentValue() throws IOException, InterruptedException {
+    return originalRR.getCurrentValue();
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    return originalRR.getProgress();
+  }
+
+  @Override
+  public void initialize(InputSplit split, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    originalRR.initialize(((TaggedInputSplit) split).getInputSplit(), context);
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    return originalRR.nextKeyValue();
+  }
+
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/DelegatingRecordReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=1235548&r1=1235547&r2=1235548&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Tue Jan 24 23:21:58 2012
@@ -422,6 +422,7 @@ public abstract class FileInputFormat<K,
    */
   public static Path[] getInputPaths(JobContext context) {
     String dirs = context.getConfiguration().get("mapred.input.dir", "");
+    System.out.println("****" + dirs);
     String [] list = StringUtils.split(dirs);
     Path[] result = new Path[list.length];
     for (int i = 0; i < list.length; i++) {

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * This class treats a line in the input as a key/value pair separated by a 
+ * separator character. The separator can be specified in config file 
+ * under the attribute name mapreduce.input.keyvaluelinerecordreader.key.value.separator. The default
+ * separator is the tab character ('\t').
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class KeyValueLineRecordReader extends RecordReader<Text, Text> {
+  public static final String KEY_VALUE_SEPERATOR = 
+    "mapreduce.input.keyvaluelinerecordreader.key.value.separator";
+  
+  private final LineRecordReader lineRecordReader;
+
+  private byte separator = (byte) '\t';
+
+  private Text innerValue;
+
+  private Text key;
+  
+  private Text value;
+  
+  public Class getKeyClass() { return Text.class; }
+  
+  public KeyValueLineRecordReader(Configuration conf)
+    throws IOException {
+    
+    lineRecordReader = new LineRecordReader();
+    String sepStr = conf.get(KEY_VALUE_SEPERATOR, "\t");
+    this.separator = (byte) sepStr.charAt(0);
+  }
+
+  public void initialize(InputSplit genericSplit,
+      TaskAttemptContext context) throws IOException {
+    lineRecordReader.initialize(genericSplit, context);
+  }
+  
+  public static int findSeparator(byte[] utf, int start, int length, 
+      byte sep) {
+    for (int i = start; i < (start + length); i++) {
+      if (utf[i] == sep) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  public static void setKeyValue(Text key, Text value, byte[] line,
+      int lineLen, int pos) {
+    if (pos == -1) {
+      key.set(line, 0, lineLen);
+      value.set("");
+    } else {
+      key.set(line, 0, pos);
+      value.set(line, pos + 1, lineLen - pos - 1);
+    }
+  }
+  /** Read key/value pair in a line. */
+  public synchronized boolean nextKeyValue()
+    throws IOException {
+    byte[] line = null;
+    int lineLen = -1;
+    if (lineRecordReader.nextKeyValue()) {
+      innerValue = lineRecordReader.getCurrentValue();
+      line = innerValue.getBytes();
+      lineLen = innerValue.getLength();
+    } else {
+      return false;
+    }
+    if (line == null)
+      return false;
+    if (key == null) {
+      key = new Text();
+    }
+    if (value == null) {
+      value = new Text();
+    }
+    int pos = findSeparator(line, 0, lineLen, this.separator);
+    setKeyValue(key, value, line, lineLen, pos);
+    return true;
+  }
+  
+  public Text getCurrentKey() {
+    return key;
+  }
+
+  public Text getCurrentValue() {
+    return value;
+  }
+
+  public float getProgress() throws IOException {
+    return lineRecordReader.getProgress();
+  }
+  
+  public synchronized void close() throws IOException { 
+    lineRecordReader.close();
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * An {@link InputFormat} for plain text files. Files are broken into lines.
+ * Either line feed or carriage-return are used to signal end of line. 
+ * Each line is divided into key and value parts by a separator byte. If no
+ * such a byte exists, the key will be the entire line and value will be empty.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class KeyValueTextInputFormat extends FileInputFormat<Text, Text> {
+
+  @Override
+  protected boolean isSplitable(JobContext context, Path file) {
+    final CompressionCodec codec =
+      new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
+    return codec == null;
+  }
+
+  public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit,
+      TaskAttemptContext context) throws IOException {
+    
+    context.setStatus(genericSplit.toString());
+    return new KeyValueLineRecordReader(context.getConfiguration());
+  }
+
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/MultipleInputs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/MultipleInputs.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/MultipleInputs.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/MultipleInputs.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * This class supports MapReduce jobs that have multiple input paths with
+ * a different {@link InputFormat} and {@link Mapper} for each path 
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class MultipleInputs {
+  public static final String DIR_FORMATS = 
+    "mapreduce.input.multipleinputs.dir.formats";
+  public static final String DIR_MAPPERS = 
+    "mapreduce.input.multipleinputs.dir.mappers";
+  
+  /**
+   * Add a {@link Path} with a custom {@link InputFormat} to the list of
+   * inputs for the map-reduce job.
+   * 
+   * @param job The {@link Job}
+   * @param path {@link Path} to be added to the list of inputs for the job
+   * @param inputFormatClass {@link InputFormat} class to use for this path
+   */
+  @SuppressWarnings("unchecked")
+  public static void addInputPath(Job job, Path path,
+      Class<? extends InputFormat> inputFormatClass) {
+    String inputFormatMapping = path.toString() + ";"
+       + inputFormatClass.getName();
+    Configuration conf = job.getConfiguration();
+    String inputFormats = conf.get(DIR_FORMATS);
+    conf.set(DIR_FORMATS,
+       inputFormats == null ? inputFormatMapping : inputFormats + ","
+           + inputFormatMapping);
+
+    job.setInputFormatClass(DelegatingInputFormat.class);
+  }
+
+  /**
+   * Add a {@link Path} with a custom {@link InputFormat} and
+   * {@link Mapper} to the list of inputs for the map-reduce job.
+   * 
+   * @param job The {@link Job}
+   * @param path {@link Path} to be added to the list of inputs for the job
+   * @param inputFormatClass {@link InputFormat} class to use for this path
+   * @param mapperClass {@link Mapper} class to use for this path
+   */
+  @SuppressWarnings("unchecked")
+  public static void addInputPath(Job job, Path path,
+      Class<? extends InputFormat> inputFormatClass,
+      Class<? extends Mapper> mapperClass) {
+
+    addInputPath(job, path, inputFormatClass);
+    Configuration conf = job.getConfiguration();
+    String mapperMapping = path.toString() + ";" + mapperClass.getName();
+    String mappers = conf.get(DIR_MAPPERS);
+    conf.set(DIR_MAPPERS, mappers == null ? mapperMapping
+       : mappers + "," + mapperMapping);
+
+    job.setMapperClass(DelegatingMapper.class);
+  }
+
+  /**
+   * Retrieves a map of {@link Path}s to the {@link InputFormat} class
+   * that should be used for them.
+   * 
+   * @param job The {@link JobContext}
+   * @see #addInputPath(JobConf, Path, Class)
+   * @return A map of paths to inputformats for the job
+   */
+  @SuppressWarnings("unchecked")
+  static Map<Path, InputFormat> getInputFormatMap(JobContext job) {
+    Map<Path, InputFormat> m = new HashMap<Path, InputFormat>();
+    Configuration conf = job.getConfiguration();
+    String[] pathMappings = conf.get(DIR_FORMATS).split(",");
+    for (String pathMapping : pathMappings) {
+      String[] split = pathMapping.split(";");
+      InputFormat inputFormat;
+      try {
+       inputFormat = (InputFormat) ReflectionUtils.newInstance(conf
+           .getClassByName(split[1]), conf);
+      } catch (ClassNotFoundException e) {
+       throw new RuntimeException(e);
+      }
+      m.put(new Path(split[0]), inputFormat);
+    }
+    return m;
+  }
+
+  /**
+   * Retrieves a map of {@link Path}s to the {@link Mapper} class that
+   * should be used for them.
+   * 
+   * @param job The {@link JobContext}
+   * @see #addInputPath(JobConf, Path, Class, Class)
+   * @return A map of paths to mappers for the job
+   */
+  @SuppressWarnings("unchecked")
+  static Map<Path, Class<? extends Mapper>> 
+      getMapperTypeMap(JobContext job) {
+    Configuration conf = job.getConfiguration();
+    if (conf.get(DIR_MAPPERS) == null) {
+      return Collections.emptyMap();
+    }
+    Map<Path, Class<? extends Mapper>> m = 
+      new HashMap<Path, Class<? extends Mapper>>();
+    String[] pathMappings = conf.get(DIR_MAPPERS).split(",");
+    for (String pathMapping : pathMappings) {
+      String[] split = pathMapping.split(";");
+      Class<? extends Mapper> mapClass;
+      try {
+       mapClass = 
+         (Class<? extends Mapper>) conf.getClassByName(split[1]);
+      } catch (ClassNotFoundException e) {
+       throw new RuntimeException(e);
+      }
+      m.put(new Path(split[0]), mapClass);
+    }
+    return m;
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/MultipleInputs.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.LineReader;
+
+/**
+ * NLineInputFormat which splits N lines of input as one split.
+ *
+ * In many "pleasantly" parallel applications, each process/mapper 
+ * processes the same input file (s), but with computations are 
+ * controlled by different parameters.(Referred to as "parameter sweeps").
+ * One way to achieve this, is to specify a set of parameters 
+ * (one set per line) as input in a control file 
+ * (which is the input path to the map-reduce application,
+ * where as the input dataset is specified 
+ * via a config variable in JobConf.).
+ * 
+ * The NLineInputFormat can be used in such applications, that splits 
+ * the input file such that by default, one line is fed as
+ * a value to one map task, and key is the offset.
+ * i.e. (k,v) is (LongWritable, Text).
+ * The location hints will span the whole mapred cluster.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class NLineInputFormat extends FileInputFormat<LongWritable, Text> { 
+  public static final String LINES_PER_MAP = 
+    "mapreduce.input.lineinputformat.linespermap";
+
+  public RecordReader<LongWritable, Text> createRecordReader(
+      InputSplit genericSplit, TaskAttemptContext context) 
+      throws IOException {
+    context.setStatus(genericSplit.toString());
+    return new LineRecordReader();
+  }
+
+  /** 
+   * Logically splits the set of input files for the job, splits N lines
+   * of the input as one split.
+   * 
+   * @see FileInputFormat#getSplits(JobContext)
+   */
+  public List<InputSplit> getSplits(JobContext job)
+  throws IOException {
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+    int numLinesPerSplit = getNumLinesPerSplit(job);
+    for (FileStatus status : listStatus(job)) {
+      splits.addAll(getSplitsForFile(status,
+        job.getConfiguration(), numLinesPerSplit));
+    }
+    return splits;
+  }
+  
+  public static List<FileSplit> getSplitsForFile(FileStatus status,
+      Configuration conf, int numLinesPerSplit) throws IOException {
+    List<FileSplit> splits = new ArrayList<FileSplit> ();
+    Path fileName = status.getPath();
+    if (status.isDir()) {
+      throw new IOException("Not a file: " + fileName);
+    }
+    FileSystem  fs = fileName.getFileSystem(conf);
+    LineReader lr = null;
+    try {
+      FSDataInputStream in  = fs.open(fileName);
+      lr = new LineReader(in, conf);
+      Text line = new Text();
+      int numLines = 0;
+      long begin = 0;
+      long length = 0;
+      int num = -1;
+      while ((num = lr.readLine(line)) > 0) {
+        numLines++;
+        length += num;
+        if (numLines == numLinesPerSplit) {
+          // NLineInputFormat uses LineRecordReader, which always reads
+          // (and consumes) at least one character out of its upper split
+          // boundary. So to make sure that each mapper gets N lines, we
+          // move back the upper split limits of each split 
+          // by one character here.
+          if (begin == 0) {
+            splits.add(new FileSplit(fileName, begin, length - 1,
+              new String[] {}));
+          } else {
+            splits.add(new FileSplit(fileName, begin - 1, length,
+              new String[] {}));
+          }
+          begin += length;
+          length = 0;
+          numLines = 0;
+        }
+      }
+      if (numLines != 0) {
+        splits.add(new FileSplit(fileName, begin, length, new String[]{}));
+      }
+    } finally {
+      if (lr != null) {
+        lr.close();
+      }
+    }
+    return splits; 
+  }
+  
+  /**
+   * Set the number of lines per split
+   * @param job the job to modify
+   * @param numLines the number of lines per split
+   */
+  public static void setNumLinesPerSplit(Job job, int numLines) {
+    job.getConfiguration().setInt(LINES_PER_MAP, numLines);
+  }
+
+  /**
+   * Get the number of lines per split
+   * @param job the job
+   * @return the number of lines per split
+   */
+  public static int getNumLinesPerSplit(JobContext job) {
+    return job.getConfiguration().getInt(LINES_PER_MAP, 1);
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsBinaryInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsBinaryInputFormat.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsBinaryInputFormat.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsBinaryInputFormat.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * InputFormat reading keys, values from SequenceFiles in binary (raw)
+ * format.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class SequenceFileAsBinaryInputFormat
+    extends SequenceFileInputFormat<BytesWritable,BytesWritable> {
+
+  public SequenceFileAsBinaryInputFormat() {
+    super();
+  }
+
+  public RecordReader<BytesWritable,BytesWritable> createRecordReader(
+      InputSplit split, TaskAttemptContext context)
+      throws IOException {
+    return new SequenceFileAsBinaryRecordReader();
+  }
+
+  /**
+   * Read records from a SequenceFile as binary (raw) bytes.
+   */
+  public static class SequenceFileAsBinaryRecordReader
+      extends RecordReader<BytesWritable,BytesWritable> {
+    private SequenceFile.Reader in;
+    private long start;
+    private long end;
+    private boolean done = false;
+    private DataOutputBuffer buffer = new DataOutputBuffer();
+    private SequenceFile.ValueBytes vbytes;
+    private BytesWritable key = null;
+    private BytesWritable value = null;
+
+    public void initialize(InputSplit split, TaskAttemptContext context) 
+        throws IOException, InterruptedException {
+      Path path = ((FileSplit)split).getPath();
+      Configuration conf = context.getConfiguration();
+      FileSystem fs = path.getFileSystem(conf);
+      this.in = new SequenceFile.Reader(fs, path, conf);
+      this.end = ((FileSplit)split).getStart() + split.getLength();
+      if (((FileSplit)split).getStart() > in.getPosition()) {
+        in.sync(((FileSplit)split).getStart());    // sync to start
+      }
+      this.start = in.getPosition();
+      vbytes = in.createValueBytes();
+      done = start >= end;
+    }
+    
+    @Override
+    public BytesWritable getCurrentKey() 
+        throws IOException, InterruptedException {
+      return key;
+    }
+    
+    @Override
+    public BytesWritable getCurrentValue() 
+        throws IOException, InterruptedException {
+      return value;
+    }
+
+    /**
+     * Retrieve the name of the key class for this SequenceFile.
+     * @see org.apache.hadoop.io.SequenceFile.Reader#getKeyClassName
+     */
+    public String getKeyClassName() {
+      return in.getKeyClassName();
+    }
+
+    /**
+     * Retrieve the name of the value class for this SequenceFile.
+     * @see org.apache.hadoop.io.SequenceFile.Reader#getValueClassName
+     */
+    public String getValueClassName() {
+      return in.getValueClassName();
+    }
+
+    /**
+     * Read raw bytes from a SequenceFile.
+     */
+    public synchronized boolean nextKeyValue()
+        throws IOException, InterruptedException {
+      if (done) {
+        return false;
+      }
+      long pos = in.getPosition();
+      boolean eof = -1 == in.nextRawKey(buffer);
+      if (!eof) {
+        if (key == null) {
+          key = new BytesWritable();
+        }
+        if (value == null) {
+          value = new BytesWritable();
+        }
+        key.set(buffer.getData(), 0, buffer.getLength());
+        buffer.reset();
+        in.nextRawValue(vbytes);
+        vbytes.writeUncompressedBytes(buffer);
+        value.set(buffer.getData(), 0, buffer.getLength());
+        buffer.reset();
+      }
+      return !(done = (eof || (pos >= end && in.syncSeen())));
+    }
+
+    public void close() throws IOException {
+      in.close();
+    }
+
+    /**
+     * Return the progress within the input split
+     * @return 0.0 to 1.0 of the input byte range
+     */
+    public float getProgress() throws IOException, InterruptedException {
+      if (end == start) {
+        return 0.0f;
+      } else {
+        return Math.min(1.0f, (float)((in.getPosition() - start) /
+                                      (double)(end - start)));
+      }
+    }
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsBinaryInputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsTextInputFormat.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsTextInputFormat.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsTextInputFormat.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * This class is similar to SequenceFileInputFormat, except it generates
+ * SequenceFileAsTextRecordReader which converts the input keys and values
+ * to their String forms by calling toString() method. 
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class SequenceFileAsTextInputFormat
+  extends SequenceFileInputFormat<Text, Text> {
+
+  public SequenceFileAsTextInputFormat() {
+    super();
+  }
+
+  public RecordReader<Text, Text> createRecordReader(InputSplit split,
+      TaskAttemptContext context) throws IOException {
+    context.setStatus(split.toString());
+    return new SequenceFileAsTextRecordReader();
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsTextInputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsTextRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsTextRecordReader.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsTextRecordReader.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsTextRecordReader.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * This class converts the input keys and values to their String forms by
+ * calling toString() method. This class to SequenceFileAsTextInputFormat
+ * class is as LineRecordReader class to TextInputFormat class.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class SequenceFileAsTextRecordReader
+  extends RecordReader<Text, Text> {
+  
+  private final SequenceFileRecordReader<WritableComparable<?>, Writable>
+    sequenceFileRecordReader;
+
+  private Text key;
+  private Text value;
+
+  public SequenceFileAsTextRecordReader()
+    throws IOException {
+    sequenceFileRecordReader =
+      new SequenceFileRecordReader<WritableComparable<?>, Writable>();
+  }
+
+  public void initialize(InputSplit split, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    sequenceFileRecordReader.initialize(split, context);
+  }
+
+  @Override
+  public Text getCurrentKey() 
+      throws IOException, InterruptedException {
+    return key;
+  }
+  
+  @Override
+  public Text getCurrentValue() 
+      throws IOException, InterruptedException {
+    return value;
+  }
+  
+  /** Read key/value pair in a line. */
+  public synchronized boolean nextKeyValue() 
+      throws IOException, InterruptedException {
+    if (!sequenceFileRecordReader.nextKeyValue()) {
+      return false;
+    }
+    if (key == null) {
+      key = new Text(); 
+    }
+    if (value == null) {
+      value = new Text(); 
+    }
+    key.set(sequenceFileRecordReader.getCurrentKey().toString());
+    value.set(sequenceFileRecordReader.getCurrentValue().toString());
+    return true;
+  }
+  
+  public float getProgress() throws IOException,  InterruptedException {
+    return sequenceFileRecordReader.getProgress();
+  }
+  
+  public synchronized void close() throws IOException {
+    sequenceFileRecordReader.close();
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsTextRecordReader.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message