hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zs...@apache.org
Subject svn commit: r740404 - in /hadoop/core/trunk: ./ src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapred/lib/ src/test/org/apache/hadoop/mapred/lib/
Date Tue, 03 Feb 2009 20:07:26 GMT
Author: zshao
Date: Tue Feb  3 20:07:26 2009
New Revision: 740404

URL: http://svn.apache.org/viewvc?rev=740404&view=rev
Log:
HADOOP-4565. Added CombineFileInputFormat to use data locality information
to create splits. (dhruba via zshao)

Added:
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/CombineFileRecordReader.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/CombineFileSplit.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MultiFileInputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MultiFileSplit.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=740404&r1=740403&r2=740404&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Feb  3 20:07:26 2009
@@ -34,6 +34,9 @@
 
   IMPROVEMENTS
 
+    HADOOP-4565. Added CombineFileInputFormat to use data locality information
+    to create splits. (dhruba via zshao)
+
     HADOOP-4936. Improvements to TestSafeMode. (shv)
 
     HADOOP-4985. Remove unnecessary "throw IOException" declarations in

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MultiFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MultiFileInputFormat.java?rev=740404&r1=740403&r2=740404&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MultiFileInputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MultiFileInputFormat.java Tue Feb  3 20:07:26 2009
@@ -34,7 +34,9 @@
  * Subclasses implement {@link #getRecordReader(InputSplit, JobConf, Reporter)}
  * to construct <code>RecordReader</code>'s for <code>MultiFileSplit</code>'s.
  * @see MultiFileSplit
+ * @deprecated Use {@link org.apache.hadoop.mapred.lib.CombineFileInputFormat} instead
  */
+@Deprecated
 public abstract class MultiFileInputFormat<K, V>
   extends FileInputFormat<K, V> {
 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MultiFileSplit.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MultiFileSplit.java?rev=740404&r1=740403&r2=740404&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MultiFileSplit.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MultiFileSplit.java Tue Feb  3 20:07:26 2009
@@ -29,6 +29,8 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.lib.CombineFileSplit;
 
 /**
  * A sub-collection of input files. Unlike {@link FileSplit}, MultiFileSplit 
@@ -38,60 +40,21 @@
  * reading one record per file.
  * @see FileSplit
  * @see MultiFileInputFormat 
+ * @deprecated Use {@link org.apache.hadoop.mapred.lib.CombineFileSplit} instead
  */
-public class MultiFileSplit implements InputSplit {
-
-  private Path[] paths;
-  private long[] lengths;
-  private long totLength;
-  private JobConf job;
+@Deprecated
+public class MultiFileSplit extends CombineFileSplit {
 
   MultiFileSplit() {}
   
   public MultiFileSplit(JobConf job, Path[] files, long[] lengths) {
-    this.job = job;
-    this.lengths = lengths;
-    this.paths = files;
-    this.totLength = 0;
-    for(long length : lengths) {
-      totLength += length;
-    }
-  }
-
-  public long getLength() {
-    return totLength;
-  }
-  
-  /** Returns an array containing the lengths of the files in 
-   * the split*/ 
-  public long[] getLengths() {
-    return lengths;
-  }
-  
-  /** Returns the length of the i<sup>th</sup> Path */
-  public long getLength(int i) {
-    return lengths[i];
-  }
-  
-  /** Returns the number of Paths in the split */
-  public int getNumPaths() {
-    return paths.length;
-  }
-
-  /** Returns the i<sup>th</sup> Path */
-  public Path getPath(int i) {
-    return paths[i];
-  }
-  
-  /** Returns all the Paths in the split */
-  public Path[] getPaths() {
-    return paths;
+    super(job, files, lengths);
   }
 
   public String[] getLocations() throws IOException {
     HashSet<String> hostSet = new HashSet<String>();
-    for (Path file : paths) {
-      FileSystem fs = file.getFileSystem(job);
+    for (Path file : getPaths()) {
+      FileSystem fs = file.getFileSystem(getJob());
       FileStatus status = fs.getFileStatus(file);
       BlockLocation[] blkLocations = fs.getFileBlockLocations(status,
                                           0, status.getLen());
@@ -107,37 +70,12 @@
       set.add(s); 
   }
 
-  public void readFields(DataInput in) throws IOException {
-    totLength = in.readLong();
-    int arrLength = in.readInt();
-    lengths = new long[arrLength];
-    for(int i=0; i<arrLength;i++) {
-      lengths[i] = in.readLong();
-    }
-    int filesLength = in.readInt();
-    paths = new Path[filesLength];
-    for(int i=0; i<filesLength;i++) {
-      paths[i] = new Path(Text.readString(in));
-    }
-  }
-
-  public void write(DataOutput out) throws IOException {
-    out.writeLong(totLength);
-    out.writeInt(lengths.length);
-    for(long length : lengths)
-      out.writeLong(length);
-    out.writeInt(paths.length);
-    for(Path p : paths) {
-      Text.writeString(out, p.toString());
-    }
-  }
-  
   @Override
   public String toString() {
     StringBuffer sb = new StringBuffer();
-    for(int i=0; i < paths.length; i++) {
-      sb.append(paths[i].toUri().getPath() + ":0+" + lengths[i]);
-      if (i < paths.length -1) {
+    for(int i=0; i < getPaths().length; i++) {
+      sb.append(getPath(i).toUri().getPath() + ":0+" + getLength(i));
+      if (i < getPaths().length -1) {
         sb.append("\n");
       }
     }

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java?rev=740404&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java Tue Feb  3 20:07:26 2009
@@ -0,0 +1,589 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.net.NetworkTopology;
+
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * An abstract {@link org.apache.hadoop.mapred.InputFormat} that returns {@link CombineFileSplit}'s
+ * in {@link org.apache.hadoop.mapred.InputFormat#getSplits(JobConf, int)} method. 
+ * Splits are constructed from the files under the input paths. 
+ * A split cannot have files from different pools.
+ * Each split returned may contain blocks from different files.
+ * If a maxSplitSize is specified, then blocks on the same node are
+ * combined to form a single split. Blocks that are left over are
+ * then combined with other blocks in the same rack. 
+ * If maxSplitSize is not specified, then blocks from the same rack
+ * are combined in a single split; no attempt is made to create
+ * node-local splits.
+ * If the maxSplitSize is equal to the block size, then this class
+ * is similar to the default spliting behaviour in Hadoop: each
+ * block is a locally processed split.
+ * Subclasses implement {@link org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit, JobConf, Reporter)}
+ * to construct <code>RecordReader</code>'s for <code>CombineFileSplit</code>'s.
+ * @see CombineFileSplit
+ */
+public abstract class CombineFileInputFormat<K, V>
+  extends FileInputFormat<K, V> {
+
+  // ability to limit the size of a single split
+  private long maxSplitSize = 0;
+  private long minSplitSizeNode = 0;
+  private long minSplitSizeRack = 0;
+
+  // A pool of input paths filters. A split cannot have blocks from files
+  // across multiple pools.
+  private ArrayList<MultiPathFilter> pools = new  ArrayList<MultiPathFilter>();
+
+  /**
+   * Specify the maximum size (in bytes) of each split. Each split is
+   * approximately equal to the specified size.
+   */
+  protected void setMaxSplitSize(long maxSplitSize) {
+    this.maxSplitSize = maxSplitSize;
+  }
+
+  /**
+   * Specify the minimum size (in bytes) of each split per node.
+   * This applies to data that is left over after combining data on a single
+   * node into splits that are of maximum size specified by maxSplitSize.
+   * This leftover data will be combined into its own split if its size
+   * exceeds minSplitSizeNode.
+   */
+  protected void setMinSplitSizeNode(long minSplitSizeNode) {
+    this.minSplitSizeNode = minSplitSizeNode;
+  }
+
+  /**
+   * Specify the minimum size (in bytes) of each split per rack.
+   * This applies to data that is left over after combining data on a single
+   * rack into splits that are of maximum size specified by maxSplitSize.
+   * This leftover data will be combined into its own split if its size
+   * exceeds minSplitSizeRack.
+   */
+  protected void setMinSplitSizeRack(long minSplitSizeRack) {
+    this.minSplitSizeRack = minSplitSizeRack;
+  }
+
+  /**
+   * Create a new pool and add the filters to it.
+   * A split cannot have files from different pools.
+   */
+  protected void createPool(JobConf conf, List<PathFilter> filters) {
+    pools.add(new MultiPathFilter(filters));
+  }
+
+  /**
+   * Create a new pool and add the filters to it. 
+   * A pathname can satisfy any one of the specified filters.
+   * A split cannot have files from different pools.
+   */
+  protected void createPool(JobConf conf, PathFilter... filters) {
+    MultiPathFilter multi = new MultiPathFilter();
+    for (PathFilter f: filters) {
+      multi.add(f);
+    }
+    pools.add(multi);
+  }
+
+  /**
+   * default constructor
+   */
+  public CombineFileInputFormat() {
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) 
+    throws IOException {
+
+    long minSizeNode = 0;
+    long minSizeRack = 0;
+    long maxSize = 0;
+
+    // the values specified by setxxxSplitSize() takes precedence over the
+    // values that might have been specified in the config
+    if (minSplitSizeNode != 0) {
+      minSizeNode = minSplitSizeNode;
+    } else {
+      minSizeNode = job.getLong("mapred.min.split.size.per.node", 0);
+    }
+    if (minSplitSizeRack != 0) {
+      minSizeRack = minSplitSizeRack;
+    } else {
+      minSizeRack = job.getLong("mapred.min.split.size.per.rack", 0);
+    }
+    if (maxSplitSize != 0) {
+      maxSize = maxSplitSize;
+    } else {
+      maxSize = job.getLong("mapred.max.split.size", 0);
+    }
+    if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
+      throw new IOException("Minimum split size pernode " + minSizeNode +
+                            " cannot be larger than maximum split size " +
+                            maxSize);
+    }
+    if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
+      throw new IOException("Minimum split size per rack" + minSizeRack +
+                            " cannot be larger than maximum split size " +
+                            maxSize);
+    }
+    if (minSizeRack != 0 && minSizeNode > minSizeRack) {
+      throw new IOException("Minimum split size per node" + minSizeNode +
+                            " cannot be smaller than minimum split size per rack " +
+                            minSizeRack);
+    }
+
+    // all the files in input set
+    Path[] paths = FileUtil.stat2Paths(listStatus(job));
+    List<CombineFileSplit> splits = new ArrayList<CombineFileSplit>();
+    if (paths.length == 0) {
+      return splits.toArray(new CombineFileSplit[splits.size()]);    
+    }
+
+    // In one single iteration, process all the paths in a single pool.
+    // Processing one pool at a time ensures that a split contans paths
+    // from a single pool only.
+    for (MultiPathFilter onepool : pools) {
+      ArrayList<Path> myPaths = new ArrayList<Path>();
+      
+      // pick one input path. If it matches all the filters in a pool,
+      // add it to the output set
+      for (int i = 0; i < paths.length; i++) {
+        if (paths[i] == null) {  // already processed
+          continue;
+        }
+        FileSystem fs = paths[i].getFileSystem(job);
+        Path p = new Path(paths[i].toUri().getPath());
+        if (onepool.accept(p)) {
+          myPaths.add(paths[i]); // add it to my output set
+          paths[i] = null;       // already processed
+        }
+      }
+      // create splits for all files in this pool.
+      getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]), 
+                    maxSize, minSizeNode, minSizeRack, splits);
+    }
+
+    // Finally, process all paths that do not belong to any pool.
+    ArrayList<Path> myPaths = new ArrayList<Path>();
+    for (int i = 0; i < paths.length; i++) {
+      if (paths[i] == null) {  // already processed
+        continue;
+      }
+      myPaths.add(paths[i]);
+    }
+    // create splits for all files that are not in any pool.
+    getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]), 
+                  maxSize, minSizeNode, minSizeRack, splits);
+
+    return splits.toArray(new CombineFileSplit[splits.size()]);    
+  }
+
+  /**
+   * Return all the splits in the specified set of paths
+   */
+  private void getMoreSplits(JobConf job, Path[] paths, 
+                             long maxSize, long minSizeNode, long minSizeRack,
+                             List<CombineFileSplit> splits)
+    throws IOException {
+
+    // all blocks for all the files in input set
+    OneFileInfo[] files;
+  
+    // mapping from a rack name to the list of blocks it has
+    HashMap<String, List<OneBlockInfo>> rackToBlocks = 
+                              new HashMap<String, List<OneBlockInfo>>();
+
+    // mapping from a block to the nodes on which it has replicas
+    HashMap<OneBlockInfo, String[]> blockToNodes = 
+                              new HashMap<OneBlockInfo, String[]>();
+
+    // mapping from a node to the list of blocks that it contains
+    HashMap<String, List<OneBlockInfo>> nodeToBlocks = 
+                              new HashMap<String, List<OneBlockInfo>>();
+    
+    files = new OneFileInfo[paths.length];
+    if (paths.length == 0) {
+      return; 
+    }
+
+    // populate all the blocks for all files
+    long totLength = 0;
+    for (int i = 0; i < paths.length; i++) {
+      files[i] = new OneFileInfo(paths[i], job, 
+                                 rackToBlocks, blockToNodes, nodeToBlocks);
+      totLength += files[i].getLength();
+    }
+
+    ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
+    ArrayList<String> nodes = new ArrayList<String>();
+    long curSplitSize = 0;
+
+    // process all nodes and create splits that are local
+    // to a node. 
+    for (Iterator<Map.Entry<String, 
+         List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator(); 
+         iter.hasNext();) {
+
+      Map.Entry<String, List<OneBlockInfo>> one = iter.next();
+      nodes.add(one.getKey());
+      List<OneBlockInfo> blocksInNode = one.getValue();
+
+      // for each block, copy it into validBlocks. Delete it from 
+      // blockToNodes so that the same block does not appear in 
+      // two different splits.
+      for (OneBlockInfo oneblock : blocksInNode) {
+        if (blockToNodes.containsKey(oneblock)) {
+          validBlocks.add(oneblock);
+          blockToNodes.remove(oneblock);
+          curSplitSize += oneblock.length;
+
+          // if the accumulated split size exceeds the maximum, then 
+          // create this split.
+          if (maxSize != 0 && curSplitSize >= maxSize) {
+            // create an input split and add it to the splits array
+            addCreatedSplit(job, splits, nodes, validBlocks);
+            curSplitSize = 0;
+            validBlocks.clear();
+          }
+        }
+      }
+      // if there were any blocks left over and their combined size is
+      // larger than minSplitNode, then combine them into one split.
+      // Otherwise add them back to the unprocessed pool. It is likely 
+      // that they will be combined with other blocks from the same rack later on.
+      if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
+        // create an input split and add it to the splits array
+        addCreatedSplit(job, splits, nodes, validBlocks);
+      } else {
+        for (OneBlockInfo oneblock : validBlocks) {
+          blockToNodes.put(oneblock, oneblock.hosts);
+        }
+      }
+      validBlocks.clear();
+      nodes.clear();
+      curSplitSize = 0;
+    }
+
+    // if blocks in a rack are below the specified minimum size, then keep them
+    // in 'overflow'. After the processing of all racks is complete, these overflow
+    // blocks will be combined into splits.
+    ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
+    ArrayList<String> racks = new ArrayList<String>();
+
+    // Process all racks over and over again until there is no more work to do.
+    while (blockToNodes.size() > 0) {
+
+      // Create one split for this rack before moving over to the next rack. 
+      // Come back to this rack after creating a single split for each of the 
+      // remaining racks.
+      // Process one rack location at a time, Combine all possible blocks that
+      // reside on this rack as one split. (constrained by minimum and maximum
+      // split size).
+
+      // iterate over all racks 
+      for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = 
+           rackToBlocks.entrySet().iterator(); iter.hasNext();) {
+
+        Map.Entry<String, List<OneBlockInfo>> one = iter.next();
+        racks.add(one.getKey());
+        List<OneBlockInfo> blocks = one.getValue();
+
+        // for each block, copy it into validBlocks. Delete it from 
+        // blockToNodes so that the same block does not appear in 
+        // two different splits.
+        boolean createdSplit = false;
+        for (OneBlockInfo oneblock : blocks) {
+          if (blockToNodes.containsKey(oneblock)) {
+            validBlocks.add(oneblock);
+            blockToNodes.remove(oneblock);
+            curSplitSize += oneblock.length;
+      
+            // if the accumulated split size exceeds the maximum, then 
+            // create this split.
+            if (maxSize != 0 && curSplitSize >= maxSize) {
+              // create an input split and add it to the splits array
+              addCreatedSplit(job, splits, racks, validBlocks);
+              createdSplit = true;
+              break;
+            }
+          }
+        }
+
+        // if we created a split, then just go to the next rack
+        if (createdSplit) {
+          curSplitSize = 0;
+          validBlocks.clear();
+          racks.clear();
+          continue;
+        }
+
+        if (!validBlocks.isEmpty()) {
+          if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
+            // if there is a mimimum size specified, then create a single split
+            // otherwise, store these blocks into overflow data structure
+            addCreatedSplit(job, splits, racks, validBlocks);
+          } else {
+            // There were a few blocks in this rack that remained to be processed.
+            // Keep them in 'overflow' block list. These will be combined later.
+            overflowBlocks.addAll(validBlocks);
+          }
+        }
+        curSplitSize = 0;
+        validBlocks.clear();
+        racks.clear();
+      }
+    }
+
+    assert blockToNodes.isEmpty();
+    assert curSplitSize == 0;
+    assert validBlocks.isEmpty();
+    assert racks.isEmpty();
+
+    // Process all overflow blocks
+    for (OneBlockInfo oneblock : overflowBlocks) {
+      validBlocks.add(oneblock);
+      curSplitSize += oneblock.length;
+
+      // This might cause an exiting rack location to be re-added,
+      // but it should be ok.
+      for (int i = 0; i < oneblock.racks.length; i++) {
+        racks.add(oneblock.racks[i]);
+      }
+
+      // if the accumulated split size exceeds the maximum, then 
+      // create this split.
+      if (maxSize != 0 && curSplitSize >= maxSize) {
+        // create an input split and add it to the splits array
+        addCreatedSplit(job, splits, racks, validBlocks);
+        curSplitSize = 0;
+        validBlocks.clear();
+        racks.clear();
+      }
+    }
+
+    // Process any remaining blocks, if any.
+    if (!validBlocks.isEmpty()) {
+      addCreatedSplit(job, splits, racks, validBlocks);
+    }
+  }
+
+  /**
+   * Create a single split from the list of blocks specified in validBlocks
+   * Add this new split into splitList.
+   */
+  private void addCreatedSplit(JobConf job,
+                               List<CombineFileSplit> splitList, 
+                               List<String> racks, 
+                               ArrayList<OneBlockInfo> validBlocks) {
+    // create an input split
+    Path[] fl = new Path[validBlocks.size()];
+    long[] offset = new long[validBlocks.size()];
+    long[] length = new long[validBlocks.size()];
+    String[] rackLocations = racks.toArray(new String[racks.size()]);
+    for (int i = 0; i < validBlocks.size(); i++) {
+      fl[i] = validBlocks.get(i).onepath; 
+      offset[i] = validBlocks.get(i).offset;
+      length[i] = validBlocks.get(i).length;
+    }
+
+     // add this split to the list that is returned
+    CombineFileSplit thissplit = new CombineFileSplit(job, fl, offset, 
+                                                      length, rackLocations);
+    splitList.add(thissplit); 
+  }
+
+  /**
+   * This is not implemented yet. 
+   */
+  public abstract RecordReader<K, V> getRecordReader(InputSplit split,
+                                      JobConf job, Reporter reporter)
+    throws IOException;
+
+  /**
+   * information about one file from the File System
+   */
+  private static class OneFileInfo {
+    private long fileSize;               // size of the file
+    private OneBlockInfo[] blocks;       // all blocks in this file
+
+    OneFileInfo(Path path, JobConf job,
+                HashMap<String, List<OneBlockInfo>> rackToBlocks,
+                HashMap<OneBlockInfo, String[]> blockToNodes,
+                HashMap<String, List<OneBlockInfo>> nodeToBlocks)
+                throws IOException {
+      this.fileSize = 0;
+
+      // get block locations from file system
+      FileSystem fs = path.getFileSystem(job);
+      FileStatus stat = fs.getFileStatus(path);
+      BlockLocation[] locations = fs.getFileBlockLocations(stat, 0, 
+                                                           stat.getLen());
+      // create a list of all block and their locations
+      if (locations == null) {
+        blocks = new OneBlockInfo[0];
+      } else {
+        blocks = new OneBlockInfo[locations.length];
+        for (int i = 0; i < locations.length; i++) {
+           
+          fileSize += locations[i].getLength();
+          OneBlockInfo oneblock =  new OneBlockInfo(path, 
+                                       locations[i].getOffset(), 
+                                       locations[i].getLength(),
+                                       locations[i].getHosts(),
+                                       locations[i].getTopologyPaths()); 
+          blocks[i] = oneblock;
+
+          // add this block to the block --> node locations map
+          blockToNodes.put(oneblock, oneblock.hosts);
+
+          // add this block to the rack --> block map
+          for (int j = 0; j < oneblock.racks.length; j++) {
+            String rack = oneblock.racks[j];
+            List<OneBlockInfo> blklist = rackToBlocks.get(rack);
+            if (blklist == null) {
+              blklist = new ArrayList<OneBlockInfo>();
+              rackToBlocks.put(rack, blklist);
+            }
+            blklist.add(oneblock);
+          }
+
+          // add this block to the node --> block map
+          for (int j = 0; j < oneblock.hosts.length; j++) {
+            String node = oneblock.hosts[j];
+            List<OneBlockInfo> blklist = nodeToBlocks.get(node);
+            if (blklist == null) {
+              blklist = new ArrayList<OneBlockInfo>();
+              nodeToBlocks.put(node, blklist);
+            }
+            blklist.add(oneblock);
+          }
+        }
+      }
+    }
+
+    long getLength() {
+      return fileSize;
+    }
+
+    OneBlockInfo[] getBlocks() {
+      return blocks;
+    }
+  }
+
+  /**
+   * information about one block from the File System
+   */
+  private static class OneBlockInfo {
+    Path onepath;                // name of this file
+    long offset;                 // offset in file
+    long length;                 // length of this block
+    String[] hosts;              // nodes on whch this block resides
+    String[] racks;              // network topology of hosts
+
+    OneBlockInfo(Path path, long offset, long len, 
+                 String[] hosts, String[] topologyPaths) {
+      this.onepath = path;
+      this.offset = offset;
+      this.hosts = hosts;
+      this.length = len;
+      assert (hosts.length == topologyPaths.length ||
+              topologyPaths.length == 0);
+
+      // if the file ystem does not have any rack information, then
+      // use dummy rack location.
+      if (topologyPaths.length == 0) {
+        topologyPaths = new String[hosts.length];
+        for (int i = 0; i < topologyPaths.length; i++) {
+          topologyPaths[i] = (new NodeBase(hosts[i], NetworkTopology.DEFAULT_RACK)).
+                                          toString();
+        }
+      }
+
+      // The topology paths have the host name included as the last 
+      // component. Strip it.
+      this.racks = new String[topologyPaths.length];
+      for (int i = 0; i < topologyPaths.length; i++) {
+        this.racks[i] = (new NodeBase(topologyPaths[i])).getNetworkLocation();
+      }
+    }
+  }
+
+  /**
+   * Accept a path only if any one of filters given in the
+   * constructor do. 
+   */
+  private static class MultiPathFilter implements PathFilter {
+    private List<PathFilter> filters;
+
+    public MultiPathFilter() {
+      this.filters = new ArrayList<PathFilter>();
+    }
+
+    public MultiPathFilter(List<PathFilter> filters) {
+      this.filters = filters;
+    }
+
+    public void add(PathFilter one) {
+      filters.add(one);
+    }
+
+    public boolean accept(Path path) {
+      for (PathFilter filter : filters) {
+        if (filter.accept(path)) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    public String toString() {
+      StringBuffer buf = new StringBuffer();
+      buf.append("[");
+      for (PathFilter f: filters) {
+        buf.append(f);
+        buf.append(",");
+      }
+      buf.append("]");
+      return buf.toString();
+    }
+  }
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/CombineFileRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/CombineFileRecordReader.java?rev=740404&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/CombineFileRecordReader.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/CombineFileRecordReader.java Tue Feb  3 20:07:26 2009
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import java.io.*;
+import java.util.*;
+import java.lang.reflect.*;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A generic RecordReader that can hand out different recordReaders
+ * for each chunk in a {@link CombineFileSplit}.
+ * A CombineFileSplit can combine data chunks from multiple files. 
+ * This class allows using different RecordReaders for processing
+ * these data chunks from different files.
+ * @see CombineFileSplit
+ */
+
+public class CombineFileRecordReader<K, V> implements RecordReader<K, V> {
+
+  static final Class [] constructorSignature = new Class [] 
+                                         {CombineFileSplit.class, 
+                                          Configuration.class, 
+                                          Reporter.class,
+                                          Integer.class};
+
+  protected CombineFileSplit split;
+  protected JobConf jc;
+  protected Reporter reporter;
+  protected Class<RecordReader<K, V>> rrClass;
+  protected Constructor<RecordReader<K, V>> rrConstructor;
+  protected FileSystem fs;
+  
+  protected int idx;
+  protected long progress;
+  protected RecordReader<K, V> curReader;
+  
+  public boolean next(K key, V value) throws IOException {
+
+    while ((curReader == null) || !curReader.next(key, value)) {
+      if (!initNextRecordReader()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public K createKey() {
+    return curReader.createKey();
+  }
+  
+  public V createValue() {
+    return curReader.createValue();
+  }
+  
+  /**
+   * return the amount of data processed
+   */
+  public long getPos() throws IOException {
+    return progress;
+  }
+  
+  public void close() throws IOException {
+    if (curReader != null) {
+      curReader.close();
+      curReader = null;
+    }
+  }
+  
+  /**
+   * return progress based on the amount of data processed so far.
+   */
+  public float getProgress() throws IOException {
+    return Math.min(1.0f,  progress/(float)(split.getLength()));
+  }
+  
+  /**
+   * A generic RecordReader that can hand out different recordReaders
+   * for each chunk in the CombineFileSplit.
+   */
+  public CombineFileRecordReader(JobConf job, CombineFileSplit split, 
+                                 Reporter reporter,
+                                 Class<RecordReader<K, V>> rrClass)
+    throws IOException {
+    this.split = split;
+    this.jc = job;
+    this.rrClass = rrClass;
+    this.reporter = reporter;
+    this.idx = 0;
+    this.curReader = null;
+    this.progress = 0;
+
+    try {
+      rrConstructor = rrClass.getDeclaredConstructor(constructorSignature);
+      rrConstructor.setAccessible(true);
+    } catch (Exception e) {
+      throw new RuntimeException(rrClass.getName() + 
+                                 " does not have valid constructor", e);
+    }
+    initNextRecordReader();
+  }
+  
+  /**
+   * Get the record reader for the next chunk in this CombineFileSplit.
+   */
+  protected boolean initNextRecordReader() throws IOException {
+
+    if (curReader != null) {
+      curReader.close();
+      curReader = null;
+      if (idx > 0) {
+        progress += split.getLength(idx-1);    // done processing so far
+      }
+    }
+
+    // if all chunks have been processed, nothing more to do.
+    if (idx == split.getNumPaths()) {
+      return false;
+    }
+
+    // get a record reader for the idx-th chunk
+    try {
+      curReader =  rrConstructor.newInstance(new Object [] 
+                            {split, jc, reporter, Integer.valueOf(idx)});
+
+      // setup some helper config variables.
+      jc.set("map.input.file", split.getPath(idx).toString());
+      jc.setLong("map.input.start", split.getOffset(idx));
+      jc.setLong("map.input.length", split.getLength(idx));
+    } catch (Exception e) {
+      throw new RuntimeException (e);
+    }
+    idx++;
+    return true;
+  }
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/CombineFileSplit.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/CombineFileSplit.java?rev=740404&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/CombineFileSplit.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/CombineFileSplit.java Tue Feb  3 20:07:26 2009
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.io.Text;
+
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * A sub-collection of input files. Unlike {@link org.apache.hadoop.mapred.FileSplit}, 
+ * CombineFileSplit * class does not represent a split of a file, but a split of input files 
+ * into smaller sets. A split may contain blocks from different file but all 
+ * the blocks in the same split are probably local to some rack <br> 
+ * CombineFileSplit can be used to implement {@link org.apache.hadoop.mapred.RecordReader}'s, 
+ * with reading one record per file.
+ * @see org.apache.hadoop.mapred.FileSplit
+ * @see CombineFileInputFormat 
+ */
+public class CombineFileSplit implements InputSplit {
+
+  private Path[] paths;
+  private long[] startoffset;
+  private long[] lengths;
+  private String[] locations;
+  private long totLength;
+  private JobConf job;
+
+  /**
+   * default constructor
+   */
+  public CombineFileSplit() {}
+  public CombineFileSplit(JobConf job, Path[] files, long[] start, 
+                          long[] lengths, String[] locations) {
+    initSplit(job, files, start, lengths, locations);
+  }
+
+  public CombineFileSplit(JobConf job, Path[] files, long[] lengths) {
+    long[] startoffset = new long[files.length];
+    for (int i = 0; i < startoffset.length; i++) {
+      startoffset[i] = 0;
+    }
+    String[] locations = new String[files.length];
+    for (int i = 0; i < locations.length; i++) {
+      locations[i] = "";
+    }
+    initSplit(job, files, startoffset, lengths, locations);
+  }
+  
+  private void initSplit(JobConf job, Path[] files, long[] start, 
+                         long[] lengths, String[] locations) {
+    this.job = job;
+    this.startoffset = start;
+    this.lengths = lengths;
+    this.paths = files;
+    this.totLength = 0;
+    this.locations = locations;
+    for(long length : lengths) {
+      totLength += length;
+    }
+  }
+
+  /**
+   * Copy constructor
+   */
+  public CombineFileSplit(CombineFileSplit old) throws IOException {
+    this(old.getJob(), old.getPaths(), old.getStartOffsets(),
+         old.getLengths(), old.getLocations());
+  }
+
+  public JobConf getJob() {
+    return job;
+  }
+
+  public long getLength() {
+    return totLength;
+  }
+
+  /** Returns an array containing the startoffsets of the files in the split*/ 
+  public long[] getStartOffsets() {
+    return startoffset;
+  }
+  
+  /** Returns an array containing the lengths of the files in the split*/ 
+  public long[] getLengths() {
+    return lengths;
+  }
+
+  /** Returns the start offset of the i<sup>th</sup> Path */
+  public long getOffset(int i) {
+    return startoffset[i];
+  }
+  
+  /** Returns the length of the i<sup>th</sup> Path */
+  public long getLength(int i) {
+    return lengths[i];
+  }
+  
+  /** Returns the number of Paths in the split */
+  public int getNumPaths() {
+    return paths.length;
+  }
+
+  /** Returns the i<sup>th</sup> Path */
+  public Path getPath(int i) {
+    return paths[i];
+  }
+  
+  /** Returns all the Paths in the split */
+  public Path[] getPaths() {
+    return paths;
+  }
+
+  /** Returns all the Paths where this input-split resides */
+  public String[] getLocations() throws IOException {
+    return locations;
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    totLength = in.readLong();
+    int arrLength = in.readInt();
+    lengths = new long[arrLength];
+    for(int i=0; i<arrLength;i++) {
+      lengths[i] = in.readLong();
+    }
+    int filesLength = in.readInt();
+    paths = new Path[filesLength];
+    for(int i=0; i<filesLength;i++) {
+      paths[i] = new Path(Text.readString(in));
+    }
+    arrLength = in.readInt();
+    startoffset = new long[arrLength];
+    for(int i=0; i<arrLength;i++) {
+      startoffset[i] = in.readLong();
+    }
+  }
+
+  public void write(DataOutput out) throws IOException {
+    out.writeLong(totLength);
+    out.writeInt(lengths.length);
+    for(long length : lengths) {
+      out.writeLong(length);
+    }
+    out.writeInt(paths.length);
+    for(Path p : paths) {
+      Text.writeString(out, p.toString());
+    }
+    out.writeInt(startoffset.length);
+    for(long length : startoffset) {
+      out.writeLong(length);
+    }
+  }
+  
+  @Override
+  public String toString() {
+    String locs = "";
+    StringBuffer locsb = new StringBuffer();
+    if (locations != null) {
+      for (int i = 0; i < locations.length; i++) {
+        locsb.append(locations[i] + ":");
+      }
+      locs = locsb.toString();
+    }
+    StringBuffer sb = new StringBuffer();
+    for (int i = 0; i < paths.length; i++) {
+      sb.append(paths[i].toUri().getPath() + ":" + startoffset[i] +
+                "+" + lengths[i] +
+                ":" + locs
+                );
+      if (i < paths.length -1) {
+        sb.append(", ");
+      }
+    }
+    return sb.toString();
+  }
+}

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java?rev=740404&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java Tue Feb  3 20:07:26 2009
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.lib;
+
+import java.io.IOException;
+import java.io.DataOutputStream;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.MiniMRCluster;
+
+public class TestCombineFileInputFormat extends TestCase{
+
+  private static final String rack1[] = new String[] {
+    "/r1"
+  };
+  private static final String hosts1[] = new String[] {
+    "host1.rack1.com"
+  };
+  private static final String rack2[] = new String[] {
+    "/r2"
+  };
+  private static final String hosts2[] = new String[] {
+    "host2.rack2.com"
+  };
+  private static final String rack3[] = new String[] {
+    "/r3"
+  };
+  private static final String hosts3[] = new String[] {
+    "host3.rack3.com"
+  };
+  final Path inDir = new Path("/racktesting");
+  final Path outputPath = new Path("/output");
+  final Path dir1 = new Path(inDir, "/dir1");
+  final Path dir2 = new Path(inDir, "/dir2");
+  final Path dir3 = new Path(inDir, "/dir3");
+  final Path dir4 = new Path(inDir, "/dir4");
+
+  static final int BLOCKSIZE = 1024;
+  static final byte[] databuf = new byte[BLOCKSIZE];
+
+  private static final Log LOG = LogFactory.getLog(TestCombineFileInputFormat.class);
+  
+  /** Dummy class to extend CombineFileInputFormat*/
+  private class DummyInputFormat extends CombineFileInputFormat<Text, Text> {
+    @Override
+    public RecordReader<Text,Text> getRecordReader(InputSplit split, JobConf job
+        , Reporter reporter) throws IOException {
+      return null;
+    }
+  }
+
+  public void testSplitPlacement() throws IOException {
+    String namenode = null;
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    FileSystem fileSys = null;
+    String testName = "TestSplitPlacement";
+    try {
+      /* Start 3 datanodes, one each in rack r1, r2, r3. Create three files
+       * 1) file1, just after starting the datanode on r1, with 
+       *    a repl factor of 1, and,
+       * 2) file2, just after starting the datanode on r2, with 
+       *    a repl factor of 2, and,
+       * 3) file3 after starting the all three datanodes, with a repl 
+       *    factor of 3.
+       * At the end, file1 will be present on only datanode1, file2 will be
+       * present on datanode 1 and datanode2 and 
+       * file3 will be present on all datanodes. 
+       */
+      JobConf conf = new JobConf();
+      conf.setBoolean("dfs.replication.considerLoad", false);
+      dfs = new MiniDFSCluster(conf, 1, true, rack1, hosts1);
+      dfs.waitActive();
+
+      namenode = (dfs.getFileSystem()).getUri().getHost() + ":" +
+                 (dfs.getFileSystem()).getUri().getPort();
+
+      fileSys = dfs.getFileSystem();
+      if (!fileSys.mkdirs(inDir)) {
+        throw new IOException("Mkdirs failed to create " + inDir.toString());
+      }
+      writeFile(conf, new Path(dir1 + "/file1"), (short)1, 1);
+      dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null);
+      dfs.waitActive();
+
+      // create file on two datanodes.
+      writeFile(conf, new Path(dir2 + "/file2"), (short)2, 2);
+
+      // split it using a CombinedFile input format
+      DummyInputFormat inFormat = new DummyInputFormat();
+      inFormat.setInputPaths(conf, dir1 + "," + dir2);
+      inFormat.setMinSplitSizeRack(BLOCKSIZE);
+      InputSplit[] splits = inFormat.getSplits(conf, 1);
+      System.out.println("Made splits(Test1): " + splits.length);
+
+      // make sure that each split has different locations
+      for (int i = 0; i < splits.length; ++i) {
+        CombineFileSplit fileSplit = (CombineFileSplit) splits[i];
+        System.out.println("File split(Test1): " + fileSplit);
+      }
+      assertEquals(splits.length, 2);
+
+      // create another file on 3 datanodes and 3 racks.
+      dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null);
+      dfs.waitActive();
+      writeFile(conf, new Path(dir3 + "/file3"), (short)3, 3);
+      inFormat = new DummyInputFormat();
+      inFormat.setInputPaths(conf, dir1 + "," + dir2 + "," + dir3);
+      inFormat.setMinSplitSizeRack(BLOCKSIZE);
+      splits = inFormat.getSplits(conf, 1);
+      for (int i = 0; i < splits.length; ++i) {
+        CombineFileSplit fileSplit = (CombineFileSplit) splits[i];
+        System.out.println("File split(Test2): " + fileSplit);
+      }
+      assertEquals(splits.length, 3);
+
+      // create file4 on all three racks
+      writeFile(conf, new Path(dir4 + "/file4"), (short)3, 3);
+      inFormat = new DummyInputFormat();
+      inFormat.setInputPaths(conf, dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      inFormat.setMinSplitSizeRack(BLOCKSIZE);
+      splits = inFormat.getSplits(conf, 1);
+      for (int i = 0; i < splits.length; ++i) {
+        CombineFileSplit fileSplit = (CombineFileSplit) splits[i];
+        System.out.println("File split(Test3): " + fileSplit);
+      }
+      assertEquals(splits.length, 3);
+
+      // maximum split size is 2 blocks 
+      inFormat = new DummyInputFormat();
+      inFormat.setMinSplitSizeNode(BLOCKSIZE);
+      inFormat.setMaxSplitSize(2*BLOCKSIZE);
+      inFormat.setInputPaths(conf, dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      splits = inFormat.getSplits(conf, 1);
+      for (int i = 0; i < splits.length; ++i) {
+        CombineFileSplit fileSplit = (CombineFileSplit) splits[i];
+        System.out.println("File split(Test4): " + fileSplit);
+      }
+      assertEquals(splits.length, 5);
+
+      // maximum split size is 3 blocks 
+      inFormat = new DummyInputFormat();
+      inFormat.setMinSplitSizeNode(BLOCKSIZE);
+      inFormat.setMaxSplitSize(3*BLOCKSIZE);
+      inFormat.setInputPaths(conf, dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      splits = inFormat.getSplits(conf, 1);
+      for (int i = 0; i < splits.length; ++i) {
+        CombineFileSplit fileSplit = (CombineFileSplit) splits[i];
+        System.out.println("File split(Test5): " + fileSplit);
+      }
+      assertEquals(splits.length, 4);
+
+      // maximum split size is 4 blocks 
+      inFormat = new DummyInputFormat();
+      inFormat.setMaxSplitSize(4*BLOCKSIZE);
+      inFormat.setInputPaths(conf, dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      splits = inFormat.getSplits(conf, 1);
+      for (int i = 0; i < splits.length; ++i) {
+        CombineFileSplit fileSplit = (CombineFileSplit) splits[i];
+        System.out.println("File split(Test6): " + fileSplit);
+      }
+      assertEquals(splits.length, 3);
+
+      // maximum split size is 7 blocks and min is 3 blocks
+      inFormat = new DummyInputFormat();
+      inFormat.setMaxSplitSize(7*BLOCKSIZE);
+      inFormat.setMinSplitSizeNode(3*BLOCKSIZE);
+      inFormat.setMinSplitSizeRack(3*BLOCKSIZE);
+      inFormat.setInputPaths(conf, dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      splits = inFormat.getSplits(conf, 1);
+      for (int i = 0; i < splits.length; ++i) {
+        CombineFileSplit fileSplit = (CombineFileSplit) splits[i];
+        System.out.println("File split(Test7): " + fileSplit);
+      }
+      assertEquals(splits.length, 2);
+
+      // Rack 1 has file1, file2 and file3 and file4
+      // Rack 2 has file2 and file3 and file4
+      // Rack 3 has file3 and file4
+      Path file1 = new Path(dir1 + "/file1");
+      file1 = new Path(conf.getWorkingDirectory(), file1);
+      Path file2 = new Path(dir2 + "/file2");
+      file2 = new Path(conf.getWorkingDirectory(), file2);
+      Path file3 = new Path(dir3 + "/file3");
+      file3 = new Path(conf.getWorkingDirectory(), file3);
+      Path file4 = new Path(dir4 + "/file4");
+      file4 = new Path(conf.getWorkingDirectory(), file3);
+
+      // setup a filter so that only file1 and file2 can be combined
+      inFormat = new DummyInputFormat();
+      inFormat.addInputPath(conf, inDir);
+      inFormat.setMinSplitSizeRack(1); // everything is at least rack local
+      inFormat.createPool(conf, new TestFilter(dir1), 
+                          new TestFilter(dir2));
+      splits = inFormat.getSplits(conf, 1);
+      for (int i = 0; i < splits.length; ++i) {
+        CombineFileSplit fileSplit = (CombineFileSplit) splits[i];
+        System.out.println("File split(TestPool1): " + fileSplit);
+      }
+      assertEquals(splits.length, 3);
+    } finally {
+      if (dfs != null) {
+        dfs.shutdown();
+      }
+    }
+  }
+
+  static void writeFile(Configuration conf, Path name,
+      short replication, int numBlocks) throws IOException {
+    FileSystem fileSys = FileSystem.get(conf);
+
+    FSDataOutputStream stm = fileSys.create(name, true,
+                                            conf.getInt("io.file.buffer.size", 4096),
+                                            replication, (long)BLOCKSIZE);
+    for (int i = 0; i < numBlocks; i++) {
+      stm.write(databuf);
+    }
+    stm.close();
+    DFSTestUtil.waitReplication(fileSys, name, replication);
+  }
+
+  static class TestFilter implements PathFilter {
+    private Path p;
+
+    // store a path prefix in this TestFilter
+    public TestFilter(Path p) {
+      this.p = p;
+    }
+
+    // returns true if the specified path matches the prefix stored
+    // in this TestFilter.
+    public boolean accept(Path path) {
+      if (path.toString().indexOf(p.toString()) == 0) {
+        return true;
+      }
+      return false;
+    }
+
+    public String toString() {
+      return "PathFilter:" + p;
+    }
+  }
+
+  public static void main(String[] args) throws Exception{
+    TestCombineFileInputFormat test = new TestCombineFileInputFormat();
+    test.testSplitPlacement();
+  }
+}



Mime
View raw message