hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r785392 [4/5] - in /hadoop/core/branches/HADOOP-4687/mapred: conf/ lib/ src/c++/ src/c++/task-controller/ src/contrib/dynamic-scheduler/ src/contrib/sqoop/ src/contrib/streaming/ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ s...
Date Tue, 16 Jun 2009 20:54:28 GMT
Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java Tue Jun 16 20:54:24 2009
@@ -19,28 +19,14 @@
 package org.apache.hadoop.mapred.lib;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.HashMap;
-import java.util.Set;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.net.NodeBase;
-import org.apache.hadoop.net.NetworkTopology;
 
+import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 /**
  * An abstract {@link org.apache.hadoop.mapred.InputFormat} that returns {@link CombineFileSplit}'s
@@ -60,69 +46,13 @@
  * Subclasses implement {@link org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit, JobConf, Reporter)}
  * to construct <code>RecordReader</code>'s for <code>CombineFileSplit</code>'s.
  * @see CombineFileSplit
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat}
  */
+@Deprecated
 public abstract class CombineFileInputFormat<K, V>
-  extends FileInputFormat<K, V> {
-
-  // ability to limit the size of a single split
-  private long maxSplitSize = 0;
-  private long minSplitSizeNode = 0;
-  private long minSplitSizeRack = 0;
-
-  // A pool of input paths filters. A split cannot have blocks from files
-  // across multiple pools.
-  private ArrayList<MultiPathFilter> pools = new  ArrayList<MultiPathFilter>();
-
-  /**
-   * Specify the maximum size (in bytes) of each split. Each split is
-   * approximately equal to the specified size.
-   */
-  protected void setMaxSplitSize(long maxSplitSize) {
-    this.maxSplitSize = maxSplitSize;
-  }
-
-  /**
-   * Specify the minimum size (in bytes) of each split per node.
-   * This applies to data that is left over after combining data on a single
-   * node into splits that are of maximum size specified by maxSplitSize.
-   * This leftover data will be combined into its own split if its size
-   * exceeds minSplitSizeNode.
-   */
-  protected void setMinSplitSizeNode(long minSplitSizeNode) {
-    this.minSplitSizeNode = minSplitSizeNode;
-  }
-
-  /**
-   * Specify the minimum size (in bytes) of each split per rack.
-   * This applies to data that is left over after combining data on a single
-   * rack into splits that are of maximum size specified by maxSplitSize.
-   * This leftover data will be combined into its own split if its size
-   * exceeds minSplitSizeRack.
-   */
-  protected void setMinSplitSizeRack(long minSplitSizeRack) {
-    this.minSplitSizeRack = minSplitSizeRack;
-  }
-
-  /**
-   * Create a new pool and add the filters to it.
-   * A split cannot have files from different pools.
-   */
-  protected void createPool(JobConf conf, List<PathFilter> filters) {
-    pools.add(new MultiPathFilter(filters));
-  }
-
-  /**
-   * Create a new pool and add the filters to it. 
-   * A pathname can satisfy any one of the specified filters.
-   * A split cannot have files from different pools.
-   */
-  protected void createPool(JobConf conf, PathFilter... filters) {
-    MultiPathFilter multi = new MultiPathFilter();
-    for (PathFilter f: filters) {
-      multi.add(f);
-    }
-    pools.add(multi);
-  }
+  extends org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat<K, V> 
+  implements InputFormat<K, V>{
 
   /**
    * default constructor
@@ -130,305 +60,9 @@
   public CombineFileInputFormat() {
   }
 
-  @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) 
     throws IOException {
-
-    long minSizeNode = 0;
-    long minSizeRack = 0;
-    long maxSize = 0;
-
-    // the values specified by setxxxSplitSize() takes precedence over the
-    // values that might have been specified in the config
-    if (minSplitSizeNode != 0) {
-      minSizeNode = minSplitSizeNode;
-    } else {
-      minSizeNode = job.getLong("mapred.min.split.size.per.node", 0);
-    }
-    if (minSplitSizeRack != 0) {
-      minSizeRack = minSplitSizeRack;
-    } else {
-      minSizeRack = job.getLong("mapred.min.split.size.per.rack", 0);
-    }
-    if (maxSplitSize != 0) {
-      maxSize = maxSplitSize;
-    } else {
-      maxSize = job.getLong("mapred.max.split.size", 0);
-    }
-    if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
-      throw new IOException("Minimum split size pernode " + minSizeNode +
-                            " cannot be larger than maximum split size " +
-                            maxSize);
-    }
-    if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
-      throw new IOException("Minimum split size per rack" + minSizeRack +
-                            " cannot be larger than maximum split size " +
-                            maxSize);
-    }
-    if (minSizeRack != 0 && minSizeNode > minSizeRack) {
-      throw new IOException("Minimum split size per node" + minSizeNode +
-                            " cannot be smaller than minimum split size per rack " +
-                            minSizeRack);
-    }
-
-    // all the files in input set
-    Path[] paths = FileUtil.stat2Paths(listStatus(job));
-    List<CombineFileSplit> splits = new ArrayList<CombineFileSplit>();
-    if (paths.length == 0) {
-      return splits.toArray(new CombineFileSplit[splits.size()]);    
-    }
-
-    // In one single iteration, process all the paths in a single pool.
-    // Processing one pool at a time ensures that a split contans paths
-    // from a single pool only.
-    for (MultiPathFilter onepool : pools) {
-      ArrayList<Path> myPaths = new ArrayList<Path>();
-      
-      // pick one input path. If it matches all the filters in a pool,
-      // add it to the output set
-      for (int i = 0; i < paths.length; i++) {
-        if (paths[i] == null) {  // already processed
-          continue;
-        }
-        FileSystem fs = paths[i].getFileSystem(job);
-        Path p = new Path(paths[i].toUri().getPath());
-        if (onepool.accept(p)) {
-          myPaths.add(paths[i]); // add it to my output set
-          paths[i] = null;       // already processed
-        }
-      }
-      // create splits for all files in this pool.
-      getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]), 
-                    maxSize, minSizeNode, minSizeRack, splits);
-    }
-
-    // Finally, process all paths that do not belong to any pool.
-    ArrayList<Path> myPaths = new ArrayList<Path>();
-    for (int i = 0; i < paths.length; i++) {
-      if (paths[i] == null) {  // already processed
-        continue;
-      }
-      myPaths.add(paths[i]);
-    }
-    // create splits for all files that are not in any pool.
-    getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]), 
-                  maxSize, minSizeNode, minSizeRack, splits);
-
-    return splits.toArray(new CombineFileSplit[splits.size()]);    
-  }
-
-  /**
-   * Return all the splits in the specified set of paths
-   */
-  private void getMoreSplits(JobConf job, Path[] paths, 
-                             long maxSize, long minSizeNode, long minSizeRack,
-                             List<CombineFileSplit> splits)
-    throws IOException {
-
-    // all blocks for all the files in input set
-    OneFileInfo[] files;
-  
-    // mapping from a rack name to the list of blocks it has
-    HashMap<String, List<OneBlockInfo>> rackToBlocks = 
-                              new HashMap<String, List<OneBlockInfo>>();
-
-    // mapping from a block to the nodes on which it has replicas
-    HashMap<OneBlockInfo, String[]> blockToNodes = 
-                              new HashMap<OneBlockInfo, String[]>();
-
-    // mapping from a node to the list of blocks that it contains
-    HashMap<String, List<OneBlockInfo>> nodeToBlocks = 
-                              new HashMap<String, List<OneBlockInfo>>();
-    
-    files = new OneFileInfo[paths.length];
-    if (paths.length == 0) {
-      return; 
-    }
-
-    // populate all the blocks for all files
-    long totLength = 0;
-    for (int i = 0; i < paths.length; i++) {
-      files[i] = new OneFileInfo(paths[i], job, 
-                                 rackToBlocks, blockToNodes, nodeToBlocks);
-      totLength += files[i].getLength();
-    }
-
-    ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
-    ArrayList<String> nodes = new ArrayList<String>();
-    long curSplitSize = 0;
-
-    // process all nodes and create splits that are local
-    // to a node. 
-    for (Iterator<Map.Entry<String, 
-         List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator(); 
-         iter.hasNext();) {
-
-      Map.Entry<String, List<OneBlockInfo>> one = iter.next();
-      nodes.add(one.getKey());
-      List<OneBlockInfo> blocksInNode = one.getValue();
-
-      // for each block, copy it into validBlocks. Delete it from 
-      // blockToNodes so that the same block does not appear in 
-      // two different splits.
-      for (OneBlockInfo oneblock : blocksInNode) {
-        if (blockToNodes.containsKey(oneblock)) {
-          validBlocks.add(oneblock);
-          blockToNodes.remove(oneblock);
-          curSplitSize += oneblock.length;
-
-          // if the accumulated split size exceeds the maximum, then 
-          // create this split.
-          if (maxSize != 0 && curSplitSize >= maxSize) {
-            // create an input split and add it to the splits array
-            addCreatedSplit(job, splits, nodes, validBlocks);
-            curSplitSize = 0;
-            validBlocks.clear();
-          }
-        }
-      }
-      // if there were any blocks left over and their combined size is
-      // larger than minSplitNode, then combine them into one split.
-      // Otherwise add them back to the unprocessed pool. It is likely 
-      // that they will be combined with other blocks from the same rack later on.
-      if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
-        // create an input split and add it to the splits array
-        addCreatedSplit(job, splits, nodes, validBlocks);
-      } else {
-        for (OneBlockInfo oneblock : validBlocks) {
-          blockToNodes.put(oneblock, oneblock.hosts);
-        }
-      }
-      validBlocks.clear();
-      nodes.clear();
-      curSplitSize = 0;
-    }
-
-    // if blocks in a rack are below the specified minimum size, then keep them
-    // in 'overflow'. After the processing of all racks is complete, these overflow
-    // blocks will be combined into splits.
-    ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
-    ArrayList<String> racks = new ArrayList<String>();
-
-    // Process all racks over and over again until there is no more work to do.
-    while (blockToNodes.size() > 0) {
-
-      // Create one split for this rack before moving over to the next rack. 
-      // Come back to this rack after creating a single split for each of the 
-      // remaining racks.
-      // Process one rack location at a time, Combine all possible blocks that
-      // reside on this rack as one split. (constrained by minimum and maximum
-      // split size).
-
-      // iterate over all racks 
-      for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = 
-           rackToBlocks.entrySet().iterator(); iter.hasNext();) {
-
-        Map.Entry<String, List<OneBlockInfo>> one = iter.next();
-        racks.add(one.getKey());
-        List<OneBlockInfo> blocks = one.getValue();
-
-        // for each block, copy it into validBlocks. Delete it from 
-        // blockToNodes so that the same block does not appear in 
-        // two different splits.
-        boolean createdSplit = false;
-        for (OneBlockInfo oneblock : blocks) {
-          if (blockToNodes.containsKey(oneblock)) {
-            validBlocks.add(oneblock);
-            blockToNodes.remove(oneblock);
-            curSplitSize += oneblock.length;
-      
-            // if the accumulated split size exceeds the maximum, then 
-            // create this split.
-            if (maxSize != 0 && curSplitSize >= maxSize) {
-              // create an input split and add it to the splits array
-              addCreatedSplit(job, splits, racks, validBlocks);
-              createdSplit = true;
-              break;
-            }
-          }
-        }
-
-        // if we created a split, then just go to the next rack
-        if (createdSplit) {
-          curSplitSize = 0;
-          validBlocks.clear();
-          racks.clear();
-          continue;
-        }
-
-        if (!validBlocks.isEmpty()) {
-          if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
-            // if there is a mimimum size specified, then create a single split
-            // otherwise, store these blocks into overflow data structure
-            addCreatedSplit(job, splits, racks, validBlocks);
-          } else {
-            // There were a few blocks in this rack that remained to be processed.
-            // Keep them in 'overflow' block list. These will be combined later.
-            overflowBlocks.addAll(validBlocks);
-          }
-        }
-        curSplitSize = 0;
-        validBlocks.clear();
-        racks.clear();
-      }
-    }
-
-    assert blockToNodes.isEmpty();
-    assert curSplitSize == 0;
-    assert validBlocks.isEmpty();
-    assert racks.isEmpty();
-
-    // Process all overflow blocks
-    for (OneBlockInfo oneblock : overflowBlocks) {
-      validBlocks.add(oneblock);
-      curSplitSize += oneblock.length;
-
-      // This might cause an exiting rack location to be re-added,
-      // but it should be ok.
-      for (int i = 0; i < oneblock.racks.length; i++) {
-        racks.add(oneblock.racks[i]);
-      }
-
-      // if the accumulated split size exceeds the maximum, then 
-      // create this split.
-      if (maxSize != 0 && curSplitSize >= maxSize) {
-        // create an input split and add it to the splits array
-        addCreatedSplit(job, splits, racks, validBlocks);
-        curSplitSize = 0;
-        validBlocks.clear();
-        racks.clear();
-      }
-    }
-
-    // Process any remaining blocks, if any.
-    if (!validBlocks.isEmpty()) {
-      addCreatedSplit(job, splits, racks, validBlocks);
-    }
-  }
-
-  /**
-   * Create a single split from the list of blocks specified in validBlocks
-   * Add this new split into splitList.
-   */
-  private void addCreatedSplit(JobConf job,
-                               List<CombineFileSplit> splitList, 
-                               List<String> racks, 
-                               ArrayList<OneBlockInfo> validBlocks) {
-    // create an input split
-    Path[] fl = new Path[validBlocks.size()];
-    long[] offset = new long[validBlocks.size()];
-    long[] length = new long[validBlocks.size()];
-    String[] rackLocations = racks.toArray(new String[racks.size()]);
-    for (int i = 0; i < validBlocks.size(); i++) {
-      fl[i] = validBlocks.get(i).onepath; 
-      offset[i] = validBlocks.get(i).offset;
-      length[i] = validBlocks.get(i).length;
-    }
-
-     // add this split to the list that is returned
-    CombineFileSplit thissplit = new CombineFileSplit(job, fl, offset, 
-                                                      length, rackLocations);
-    splitList.add(thissplit); 
+    return super.getSplits(new Job(job)).toArray(new InputSplit[0]);
   }
 
   /**
@@ -438,152 +72,11 @@
                                       JobConf job, Reporter reporter)
     throws IOException;
 
-  /**
-   * information about one file from the File System
-   */
-  private static class OneFileInfo {
-    private long fileSize;               // size of the file
-    private OneBlockInfo[] blocks;       // all blocks in this file
-
-    OneFileInfo(Path path, JobConf job,
-                HashMap<String, List<OneBlockInfo>> rackToBlocks,
-                HashMap<OneBlockInfo, String[]> blockToNodes,
-                HashMap<String, List<OneBlockInfo>> nodeToBlocks)
-                throws IOException {
-      this.fileSize = 0;
-
-      // get block locations from file system
-      FileSystem fs = path.getFileSystem(job);
-      FileStatus stat = fs.getFileStatus(path);
-      BlockLocation[] locations = fs.getFileBlockLocations(stat, 0, 
-                                                           stat.getLen());
-      // create a list of all block and their locations
-      if (locations == null) {
-        blocks = new OneBlockInfo[0];
-      } else {
-        blocks = new OneBlockInfo[locations.length];
-        for (int i = 0; i < locations.length; i++) {
-           
-          fileSize += locations[i].getLength();
-          OneBlockInfo oneblock =  new OneBlockInfo(path, 
-                                       locations[i].getOffset(), 
-                                       locations[i].getLength(),
-                                       locations[i].getHosts(),
-                                       locations[i].getTopologyPaths()); 
-          blocks[i] = oneblock;
-
-          // add this block to the block --> node locations map
-          blockToNodes.put(oneblock, oneblock.hosts);
-
-          // add this block to the rack --> block map
-          for (int j = 0; j < oneblock.racks.length; j++) {
-            String rack = oneblock.racks[j];
-            List<OneBlockInfo> blklist = rackToBlocks.get(rack);
-            if (blklist == null) {
-              blklist = new ArrayList<OneBlockInfo>();
-              rackToBlocks.put(rack, blklist);
-            }
-            blklist.add(oneblock);
-          }
-
-          // add this block to the node --> block map
-          for (int j = 0; j < oneblock.hosts.length; j++) {
-            String node = oneblock.hosts[j];
-            List<OneBlockInfo> blklist = nodeToBlocks.get(node);
-            if (blklist == null) {
-              blklist = new ArrayList<OneBlockInfo>();
-              nodeToBlocks.put(node, blklist);
-            }
-            blklist.add(oneblock);
-          }
-        }
-      }
-    }
-
-    long getLength() {
-      return fileSize;
-    }
-
-    OneBlockInfo[] getBlocks() {
-      return blocks;
-    }
+  // abstract method from super class implemented to return null
+  public org.apache.hadoop.mapreduce.RecordReader<K, V> createRecordReader(
+      org.apache.hadoop.mapreduce.InputSplit split,
+      TaskAttemptContext context) throws IOException {
+    return null;
   }
 
-  /**
-   * information about one block from the File System
-   */
-  private static class OneBlockInfo {
-    Path onepath;                // name of this file
-    long offset;                 // offset in file
-    long length;                 // length of this block
-    String[] hosts;              // nodes on whch this block resides
-    String[] racks;              // network topology of hosts
-
-    OneBlockInfo(Path path, long offset, long len, 
-                 String[] hosts, String[] topologyPaths) {
-      this.onepath = path;
-      this.offset = offset;
-      this.hosts = hosts;
-      this.length = len;
-      assert (hosts.length == topologyPaths.length ||
-              topologyPaths.length == 0);
-
-      // if the file ystem does not have any rack information, then
-      // use dummy rack location.
-      if (topologyPaths.length == 0) {
-        topologyPaths = new String[hosts.length];
-        for (int i = 0; i < topologyPaths.length; i++) {
-          topologyPaths[i] = (new NodeBase(hosts[i], NetworkTopology.DEFAULT_RACK)).
-                                          toString();
-        }
-      }
-
-      // The topology paths have the host name included as the last 
-      // component. Strip it.
-      this.racks = new String[topologyPaths.length];
-      for (int i = 0; i < topologyPaths.length; i++) {
-        this.racks[i] = (new NodeBase(topologyPaths[i])).getNetworkLocation();
-      }
-    }
-  }
-
-  /**
-   * Accept a path only if any one of filters given in the
-   * constructor do. 
-   */
-  private static class MultiPathFilter implements PathFilter {
-    private List<PathFilter> filters;
-
-    public MultiPathFilter() {
-      this.filters = new ArrayList<PathFilter>();
-    }
-
-    public MultiPathFilter(List<PathFilter> filters) {
-      this.filters = filters;
-    }
-
-    public void add(PathFilter one) {
-      filters.add(one);
-    }
-
-    public boolean accept(Path path) {
-      for (PathFilter filter : filters) {
-        if (filter.accept(path)) {
-          return true;
-        }
-      }
-      return false;
-    }
-
-    public String toString() {
-      StringBuffer buf = new StringBuffer();
-      buf.append("[");
-      for (PathFilter f: filters) {
-        buf.append(f);
-        buf.append(",");
-      }
-      buf.append("]");
-      return buf.toString();
-    }
-  }
 }

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/lib/CombineFileRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/lib/CombineFileRecordReader.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/lib/CombineFileRecordReader.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/lib/CombineFileRecordReader.java Tue Jun 16 20:54:24 2009
@@ -19,11 +19,9 @@
 package org.apache.hadoop.mapred.lib;
 
 import java.io.*;
-import java.util.*;
 import java.lang.reflect.*;
 
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.conf.Configuration;
@@ -35,8 +33,10 @@
  * This class allows using different RecordReaders for processing
  * these data chunks from different files.
  * @see CombineFileSplit
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader}
  */
-
+@Deprecated
 public class CombineFileRecordReader<K, V> implements RecordReader<K, V> {
 
   static final Class [] constructorSignature = new Class [] 

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/lib/CombineFileSplit.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/lib/CombineFileSplit.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/lib/CombineFileSplit.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/lib/CombineFileSplit.java Tue Jun 16 20:54:24 2009
@@ -18,188 +18,30 @@
 
 package org.apache.hadoop.mapred.lib;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.io.Text;
-
 import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.JobConf;
 
 /**
- * A sub-collection of input files. Unlike {@link org.apache.hadoop.mapred.FileSplit}, 
- * CombineFileSplit * class does not represent a split of a file, but a split of input files 
- * into smaller sets. A split may contain blocks from different file but all 
- * the blocks in the same split are probably local to some rack <br> 
- * CombineFileSplit can be used to implement {@link org.apache.hadoop.mapred.RecordReader}'s, 
- * with reading one record per file.
- * @see org.apache.hadoop.mapred.FileSplit
- * @see CombineFileInputFormat 
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.input.CombineFileSplit}
  */
-public class CombineFileSplit implements InputSplit {
+@Deprecated
+public class CombineFileSplit extends 
+    org.apache.hadoop.mapreduce.lib.input.CombineFileSplit 
+    implements InputSplit {
 
-  private Path[] paths;
-  private long[] startoffset;
-  private long[] lengths;
-  private String[] locations;
-  private long totLength;
   private JobConf job;
 
-  /**
-   * default constructor
-   */
-  public CombineFileSplit() {}
-  public CombineFileSplit(JobConf job, Path[] files, long[] start, 
-                          long[] lengths, String[] locations) {
-    initSplit(job, files, start, lengths, locations);
+  public CombineFileSplit() {
   }
 
   public CombineFileSplit(JobConf job, Path[] files, long[] lengths) {
-    long[] startoffset = new long[files.length];
-    for (int i = 0; i < startoffset.length; i++) {
-      startoffset[i] = 0;
-    }
-    String[] locations = new String[files.length];
-    for (int i = 0; i < locations.length; i++) {
-      locations[i] = "";
-    }
-    initSplit(job, files, startoffset, lengths, locations);
-  }
-  
-  private void initSplit(JobConf job, Path[] files, long[] start, 
-                         long[] lengths, String[] locations) {
+    super(files, lengths);
     this.job = job;
-    this.startoffset = start;
-    this.lengths = lengths;
-    this.paths = files;
-    this.totLength = 0;
-    this.locations = locations;
-    for(long length : lengths) {
-      totLength += length;
-    }
-  }
-
-  /**
-   * Copy constructor
-   */
-  public CombineFileSplit(CombineFileSplit old) throws IOException {
-    this(old.getJob(), old.getPaths(), old.getStartOffsets(),
-         old.getLengths(), old.getLocations());
   }
 
   public JobConf getJob() {
     return job;
   }
-
-  public long getLength() {
-    return totLength;
-  }
-
-  /** Returns an array containing the startoffsets of the files in the split*/ 
-  public long[] getStartOffsets() {
-    return startoffset;
-  }
-  
-  /** Returns an array containing the lengths of the files in the split*/ 
-  public long[] getLengths() {
-    return lengths;
-  }
-
-  /** Returns the start offset of the i<sup>th</sup> Path */
-  public long getOffset(int i) {
-    return startoffset[i];
-  }
-  
-  /** Returns the length of the i<sup>th</sup> Path */
-  public long getLength(int i) {
-    return lengths[i];
-  }
-  
-  /** Returns the number of Paths in the split */
-  public int getNumPaths() {
-    return paths.length;
-  }
-
-  /** Returns the i<sup>th</sup> Path */
-  public Path getPath(int i) {
-    return paths[i];
-  }
-  
-  /** Returns all the Paths in the split */
-  public Path[] getPaths() {
-    return paths;
-  }
-
-  /** Returns all the Paths where this input-split resides */
-  public String[] getLocations() throws IOException {
-    return locations;
-  }
-
-  public void readFields(DataInput in) throws IOException {
-    totLength = in.readLong();
-    int arrLength = in.readInt();
-    lengths = new long[arrLength];
-    for(int i=0; i<arrLength;i++) {
-      lengths[i] = in.readLong();
-    }
-    int filesLength = in.readInt();
-    paths = new Path[filesLength];
-    for(int i=0; i<filesLength;i++) {
-      paths[i] = new Path(Text.readString(in));
-    }
-    arrLength = in.readInt();
-    startoffset = new long[arrLength];
-    for(int i=0; i<arrLength;i++) {
-      startoffset[i] = in.readLong();
-    }
-  }
-
-  public void write(DataOutput out) throws IOException {
-    out.writeLong(totLength);
-    out.writeInt(lengths.length);
-    for(long length : lengths) {
-      out.writeLong(length);
-    }
-    out.writeInt(paths.length);
-    for(Path p : paths) {
-      Text.writeString(out, p.toString());
-    }
-    out.writeInt(startoffset.length);
-    for(long length : startoffset) {
-      out.writeLong(length);
-    }
-  }
-  
-  @Override
- public String toString() {
-    StringBuffer sb = new StringBuffer();
-    for (int i = 0; i < paths.length; i++) {
-      if (i == 0 ) {
-        sb.append("Paths:");
-      }
-      sb.append(paths[i].toUri().getPath() + ":" + startoffset[i] +
-                "+" + lengths[i]);
-      if (i < paths.length -1) {
-        sb.append(",");
-      }
-    }
-    if (locations != null) {
-      String locs = "";
-      StringBuffer locsb = new StringBuffer();
-      for (int i = 0; i < locations.length; i++) {
-        locsb.append(locations[i] + ":");
-      }
-      locs = locsb.toString();
-      sb.append(" Locations:" + locs + "; ");
-    }
-    return sb.toString();
-  }
 }

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/lib/KeyFieldBasedComparator.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/lib/KeyFieldBasedComparator.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/lib/KeyFieldBasedComparator.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/lib/KeyFieldBasedComparator.java Tue Jun 16 20:54:24 2009
@@ -106,7 +106,7 @@
     }
     int compareResult = 0;
     if (!key.numeric) {
-      compareResult = compareBytes(first, start1, end1, second, start2, end2);
+      compareResult = compareBytes(first, start1, end1-start1+1, second, start2, end2-start2+1);
     }
     if (key.numeric) {
       compareResult = numericalCompare (first, start1, end1, second, start2, end2);

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/lib/db/DBConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/lib/db/DBConfiguration.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/lib/db/DBConfiguration.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/lib/db/DBConfiguration.java Tue Jun 16 20:54:24 2009
@@ -18,70 +18,71 @@
 
 package org.apache.hadoop.mapred.lib.db;
 
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.lib.db.DBInputFormat.NullDBWritable;
 
 /**
- * A container for configuration property names for jobs with DB input/output. 
- * <br>
- * The job can be configured using the static methods in this class, 
- * {@link DBInputFormat}, and {@link DBOutputFormat}. 
- * <p> 
- * Alternatively, the properties can be set in the configuration with proper
- * values. 
- *   
- * @see DBConfiguration#configureDB(JobConf, String, String, String, String)
- * @see DBInputFormat#setInput(JobConf, Class, String, String)
- * @see DBInputFormat#setInput(JobConf, Class, String, String, String, String...)
- * @see DBOutputFormat#setOutput(JobConf, String, String...)
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.db.DBConfiguration} instead 
  */
-public class DBConfiguration {
-
+@Deprecated
+public class DBConfiguration extends 
+    org.apache.hadoop.mapreduce.lib.db.DBConfiguration {
   /** The JDBC Driver class name */
-  public static final String DRIVER_CLASS_PROPERTY = "mapred.jdbc.driver.class";
+  public static final String DRIVER_CLASS_PROPERTY = 
+    org.apache.hadoop.mapreduce.lib.db.DBConfiguration.DRIVER_CLASS_PROPERTY;
   
   /** JDBC Database access URL */
-  public static final String URL_PROPERTY = "mapred.jdbc.url";
+  public static final String URL_PROPERTY = 
+    org.apache.hadoop.mapreduce.lib.db.DBConfiguration.URL_PROPERTY;
 
   /** User name to access the database */
-  public static final String USERNAME_PROPERTY = "mapred.jdbc.username";
+  public static final String USERNAME_PROPERTY = 
+    org.apache.hadoop.mapreduce.lib.db.DBConfiguration.USERNAME_PROPERTY;
   
   /** Password to access the database */
-  public static final String PASSWORD_PROPERTY = "mapred.jdbc.password";
+  public static final String PASSWORD_PROPERTY = 
+    org.apache.hadoop.mapreduce.lib.db.DBConfiguration.PASSWORD_PROPERTY;
 
   /** Input table name */
-  public static final String INPUT_TABLE_NAME_PROPERTY = "mapred.jdbc.input.table.name";
+  public static final String INPUT_TABLE_NAME_PROPERTY = org.apache.hadoop.
+    mapreduce.lib.db.DBConfiguration.INPUT_TABLE_NAME_PROPERTY;
 
   /** Field names in the Input table */
-  public static final String INPUT_FIELD_NAMES_PROPERTY = "mapred.jdbc.input.field.names";
+  public static final String INPUT_FIELD_NAMES_PROPERTY = org.apache.hadoop.
+    mapreduce.lib.db.DBConfiguration.INPUT_FIELD_NAMES_PROPERTY;
 
   /** WHERE clause in the input SELECT statement */
-  public static final String INPUT_CONDITIONS_PROPERTY = "mapred.jdbc.input.conditions";
+  public static final String INPUT_CONDITIONS_PROPERTY = org.apache.hadoop.
+    mapreduce.lib.db.DBConfiguration.INPUT_CONDITIONS_PROPERTY;
   
   /** ORDER BY clause in the input SELECT statement */
-  public static final String INPUT_ORDER_BY_PROPERTY = "mapred.jdbc.input.orderby";
+  public static final String INPUT_ORDER_BY_PROPERTY = org.apache.hadoop.
+    mapreduce.lib.db.DBConfiguration.INPUT_ORDER_BY_PROPERTY;
   
   /** Whole input query, exluding LIMIT...OFFSET */
-  public static final String INPUT_QUERY = "mapred.jdbc.input.query";
+  public static final String INPUT_QUERY = 
+    org.apache.hadoop.mapreduce.lib.db.DBConfiguration.INPUT_QUERY;
   
   /** Input query to get the count of records */
-  public static final String INPUT_COUNT_QUERY = "mapred.jdbc.input.count.query";
+  public static final String INPUT_COUNT_QUERY = 
+    org.apache.hadoop.mapreduce.lib.db.DBConfiguration.INPUT_COUNT_QUERY;
   
   /** Class name implementing DBWritable which will hold input tuples */
-  public static final String INPUT_CLASS_PROPERTY = "mapred.jdbc.input.class";
+  public static final String INPUT_CLASS_PROPERTY = 
+    org.apache.hadoop.mapreduce.lib.db.DBConfiguration.INPUT_CLASS_PROPERTY;
 
   /** Output table name */
-  public static final String OUTPUT_TABLE_NAME_PROPERTY = "mapred.jdbc.output.table.name";
+  public static final String OUTPUT_TABLE_NAME_PROPERTY = org.apache.hadoop.
+    mapreduce.lib.db.DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY;
 
   /** Field names in the Output table */
-  public static final String OUTPUT_FIELD_NAMES_PROPERTY = "mapred.jdbc.output.field.names";  
+  public static final String OUTPUT_FIELD_NAMES_PROPERTY = org.apache.hadoop.
+    mapreduce.lib.db.DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY;  
 
   /** Number of fields in the Output table */
-  public static final String OUTPUT_FIELD_COUNT_PROPERTY = "mapred.jdbc.output.field.count";  
+  public static final String OUTPUT_FIELD_COUNT_PROPERTY = org.apache.hadoop.
+    mapreduce.lib.db.DBConfiguration.OUTPUT_FIELD_COUNT_PROPERTY;
+
   
   /**
    * Sets the DB access related fields in the JobConf.  
@@ -112,115 +113,8 @@
     configureDB(job, driverClass, dbUrl, null, null);
   }
 
-  private JobConf job;
-
   DBConfiguration(JobConf job) {
-    this.job = job;
-  }
-
-  /** Returns a connection object o the DB 
-   * @throws ClassNotFoundException 
-   * @throws SQLException */
-  Connection getConnection() throws ClassNotFoundException, SQLException{
-
-    Class.forName(job.get(DBConfiguration.DRIVER_CLASS_PROPERTY));
-
-    if(job.get(DBConfiguration.USERNAME_PROPERTY) == null) {
-      return DriverManager.getConnection(job.get(DBConfiguration.URL_PROPERTY));
-    } else {
-      return DriverManager.getConnection(
-          job.get(DBConfiguration.URL_PROPERTY), 
-          job.get(DBConfiguration.USERNAME_PROPERTY), 
-          job.get(DBConfiguration.PASSWORD_PROPERTY));
-    }
-  }
-
-  String getInputTableName() {
-    return job.get(DBConfiguration.INPUT_TABLE_NAME_PROPERTY);
-  }
-
-  void setInputTableName(String tableName) {
-    job.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName);
-  }
-
-  String[] getInputFieldNames() {
-    return job.getStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY);
-  }
-
-  void setInputFieldNames(String... fieldNames) {
-    job.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, fieldNames);
-  }
-
-  String getInputConditions() {
-    return job.get(DBConfiguration.INPUT_CONDITIONS_PROPERTY);
-  }
-
-  void setInputConditions(String conditions) {
-    if (conditions != null && conditions.length() > 0)
-      job.set(DBConfiguration.INPUT_CONDITIONS_PROPERTY, conditions);
-  }
-
-  String getInputOrderBy() {
-    return job.get(DBConfiguration.INPUT_ORDER_BY_PROPERTY);
-  }
-  
-  void setInputOrderBy(String orderby) {
-    if(orderby != null && orderby.length() >0) {
-      job.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, orderby);
-    }
-  }
-  
-  String getInputQuery() {
-    return job.get(DBConfiguration.INPUT_QUERY);
-  }
-  
-  void setInputQuery(String query) {
-    if(query != null && query.length() >0) {
-      job.set(DBConfiguration.INPUT_QUERY, query);
-    }
-  }
-  
-  String getInputCountQuery() {
-    return job.get(DBConfiguration.INPUT_COUNT_QUERY);
-  }
-  
-  void setInputCountQuery(String query) {
-    if(query != null && query.length() >0) {
-      job.set(DBConfiguration.INPUT_COUNT_QUERY, query);
-    }
-  }
-  
-  
-  Class<?> getInputClass() {
-    return job.getClass(DBConfiguration.INPUT_CLASS_PROPERTY, NullDBWritable.class);
-  }
-
-  void setInputClass(Class<? extends DBWritable> inputClass) {
-    job.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, inputClass, DBWritable.class);
-  }
-
-  String getOutputTableName() {
-    return job.get(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY);
-  }
-
-  void setOutputTableName(String tableName) {
-    job.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName);
-  }
-
-  String[] getOutputFieldNames() {
-    return job.getStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY);
-  }
-
-  void setOutputFieldNames(String... fieldNames) {
-    job.setStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, fieldNames);
-  }
-
-  void setOutputFieldCount(int fieldCount) {
-    job.setInt(DBConfiguration.OUTPUT_FIELD_COUNT_PROPERTY, fieldCount);
-  }
-  
-  int getOutputFieldCount() {
-    return job.getInt(OUTPUT_FIELD_COUNT_PROPERTY, 0);
+    super(job);
   }
   
 }

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/lib/db/DBInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/lib/db/DBInputFormat.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/lib/db/DBInputFormat.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/lib/db/DBInputFormat.java Tue Jun 16 20:54:24 2009
@@ -18,14 +18,9 @@
 
 package org.apache.hadoop.mapred.lib.db;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
+import java.util.List;
 
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
@@ -35,100 +30,31 @@
 import org.apache.hadoop.mapred.JobConfigurable;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapreduce.Job;
 
 /**
- * A InputFormat that reads input data from an SQL table.
- * <p>
- * DBInputFormat emits LongWritables containing the record number as 
- * key and DBWritables as value. 
- * 
- * The SQL query, and input class can be using one of the two 
- * setInput methods.
+ * @deprecated 
+ * Use {@link org.apache.hadoop.mapreduce.lib.db.DBInputFormat} instead.
  */
+@Deprecated
 public class DBInputFormat<T  extends DBWritable>
-  implements InputFormat<LongWritable, T>, JobConfigurable {
+    extends org.apache.hadoop.mapreduce.lib.db.DBInputFormat<T> 
+    implements InputFormat<LongWritable, T>, JobConfigurable {
   /**
    * A RecordReader that reads records from a SQL table.
    * Emits LongWritables containing the record number as 
    * key and DBWritables as value.  
    */
-  protected class DBRecordReader implements
-  RecordReader<LongWritable, T> {
-    private ResultSet results;
-
-    private Statement statement;
-
-    private Class<T> inputClass;
-
-    private JobConf job;
-
-    private DBInputSplit split;
-
-    private long pos = 0;
-
+  protected class DBRecordReader extends
+      org.apache.hadoop.mapreduce.lib.db.DBInputFormat<T>.DBRecordReader
+      implements RecordReader<LongWritable, T> {
     /**
      * @param split The InputSplit to read data for
      * @throws SQLException 
      */
-    protected DBRecordReader(DBInputSplit split, Class<T> inputClass, JobConf job) throws SQLException {
-      this.inputClass = inputClass;
-      this.split = split;
-      this.job = job;
-      
-      statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
-
-      //statement.setFetchSize(Integer.MIN_VALUE);
-      results = statement.executeQuery(getSelectQuery());
-    }
-
-    /** Returns the query for selecting the records, 
-     * subclasses can override this for custom behaviour.*/
-    protected String getSelectQuery() {
-      StringBuilder query = new StringBuilder();
-      
-      if(dbConf.getInputQuery() == null) {
-        query.append("SELECT ");
-
-        for (int i = 0; i < fieldNames.length; i++) {
-          query.append(fieldNames[i]);
-          if(i != fieldNames.length -1) {
-            query.append(", ");
-          }
-        }
-
-        query.append(" FROM ").append(tableName);
-        query.append(" AS ").append(tableName); //in hsqldb this is necessary
-        if (conditions != null && conditions.length() > 0)
-          query.append(" WHERE (").append(conditions).append(")");
-        String orderBy = dbConf.getInputOrderBy();
-        if(orderBy != null && orderBy.length() > 0) {
-          query.append(" ORDER BY ").append(orderBy);
-        }
-      }
-      else {
-        query.append(dbConf.getInputQuery());
-      }
-
-      try {
-        query.append(" LIMIT ").append(split.getLength());
-        query.append(" OFFSET ").append(split.getStart());
-      }
-      catch (IOException ex) {
-        //ignore, will not throw
-      }
-      return query.toString();
-    }
-
-    /** {@inheritDoc} */
-    public void close() throws IOException {
-      try {
-        connection.commit();
-        results.close();
-        statement.close();
-      } catch (SQLException e) {
-        throw new IOException(e.getMessage());
-      }
+    protected DBRecordReader(DBInputSplit split, Class<T> inputClass, 
+      JobConf job) throws SQLException {
+     super(split, inputClass, job);
     }
 
     /** {@inheritDoc} */
@@ -138,59 +64,32 @@
 
     /** {@inheritDoc} */
     public T createValue() {
-      return ReflectionUtils.newInstance(inputClass, job);
+      return super.createValue();
     }
 
-    /** {@inheritDoc} */
     public long getPos() throws IOException {
-      return pos;
-    }
-
-    /** {@inheritDoc} */
-    public float getProgress() throws IOException {
-      return pos / (float)split.getLength();
+      return super.getPos();
     }
 
     /** {@inheritDoc} */
     public boolean next(LongWritable key, T value) throws IOException {
-      try {
-        if (!results.next())
-          return false;
-
-        // Set the key field value as the output key value
-        key.set(pos + split.getStart());
-
-        value.readFields(results);
-
-        pos ++;
-      } catch (SQLException e) {
-        throw new IOException(e.getMessage());
-      }
-      return true;
+      return super.next(key, value);
     }
   }
 
   /**
    * A Class that does nothing, implementing DBWritable
    */
-  public static class NullDBWritable implements DBWritable, Writable {
-    @Override
-    public void readFields(DataInput in) throws IOException { }
-    @Override
-    public void readFields(ResultSet arg0) throws SQLException { }
-    @Override
-    public void write(DataOutput out) throws IOException { }
-    @Override
-    public void write(PreparedStatement arg0) throws SQLException { }
+  public static class NullDBWritable extends 
+      org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable 
+      implements DBWritable, Writable {
   }
   /**
    * A InputSplit that spans a set of rows
    */
-  protected static class DBInputSplit implements InputSplit {
-
-    private long end = 0;
-    private long start = 0;
-
+  protected static class DBInputSplit extends 
+      org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit 
+      implements InputSplit {
     /**
      * Default Constructor
      */
@@ -203,77 +102,13 @@
      * @param end the index of the last row to select
      */
     public DBInputSplit(long start, long end) {
-      this.start = start;
-      this.end = end;
-    }
-
-    /** {@inheritDoc} */
-    public String[] getLocations() throws IOException {
-      // TODO Add a layer to enable SQL "sharding" and support locality
-      return new String[] {};
-    }
-
-    /**
-     * @return The index of the first row to select
-     */
-    public long getStart() {
-      return start;
-    }
-
-    /**
-     * @return The index of the last row to select
-     */
-    public long getEnd() {
-      return end;
-    }
-
-    /**
-     * @return The total row count in this split
-     */
-    public long getLength() throws IOException {
-      return end - start;
-    }
-
-    /** {@inheritDoc} */
-    public void readFields(DataInput input) throws IOException {
-      start = input.readLong();
-      end = input.readLong();
-    }
-
-    /** {@inheritDoc} */
-    public void write(DataOutput output) throws IOException {
-      output.writeLong(start);
-      output.writeLong(end);
+      super(start, end);
     }
   }
 
-  private String conditions;
-
-  private Connection connection;
-
-  private String tableName;
-
-  private String[] fieldNames;
-
-  private DBConfiguration dbConf;
-
   /** {@inheritDoc} */
   public void configure(JobConf job) {
-
-    dbConf = new DBConfiguration(job);
-
-    try {
-      this.connection = dbConf.getConnection();
-      this.connection.setAutoCommit(false);
-      connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
-    }
-    catch (Exception ex) {
-      throw new RuntimeException(ex);
-    }
-
-    tableName = dbConf.getInputTableName();
-    fieldNames = dbConf.getInputFieldNames();
-    conditions = dbConf.getInputConditions();
+    super.setConf(job);
   }
 
   /** {@inheritDoc} */
@@ -281,7 +116,7 @@
   public RecordReader<LongWritable, T> getRecordReader(InputSplit split,
       JobConf job, Reporter reporter) throws IOException {
 
-    Class inputClass = dbConf.getInputClass();
+    Class inputClass = super.getDBConf().getInputClass();
     try {
       return new DBRecordReader((DBInputSplit) split, inputClass, job);
     }
@@ -292,63 +127,16 @@
 
   /** {@inheritDoc} */
   public InputSplit[] getSplits(JobConf job, int chunks) throws IOException {
-
-	ResultSet results = null;  
-	Statement statement = null;
-    try {
-      statement = connection.createStatement();
-
-      results = statement.executeQuery(getCountQuery());
-      results.next();
-
-      long count = results.getLong(1);
-      long chunkSize = (count / chunks);
-
-      results.close();
-      statement.close();
-
-      InputSplit[] splits = new InputSplit[chunks];
-
-      // Split the rows into n-number of chunks and adjust the last chunk
-      // accordingly
-      for (int i = 0; i < chunks; i++) {
-        DBInputSplit split;
-
-        if ((i + 1) == chunks)
-          split = new DBInputSplit(i * chunkSize, count);
-        else
-          split = new DBInputSplit(i * chunkSize, (i * chunkSize)
-              + chunkSize);
-
-        splits[i] = split;
-      }
-
-      return splits;
-    } catch (SQLException e) {
-      try {
-        if (results != null) { results.close(); }
-      } catch (SQLException e1) {}
-      try {
-        if (statement != null) { statement.close(); }
-      } catch (SQLException e1) {}
-      throw new IOException(e.getMessage());
+    List<org.apache.hadoop.mapreduce.InputSplit> newSplits = 
+      super.getSplits(new Job(job));
+    InputSplit[] ret = new InputSplit[newSplits.size()];
+    int i = 0;
+    for (org.apache.hadoop.mapreduce.InputSplit s : newSplits) {
+      org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit split = 
+    	(org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit)s;
+      ret[i++] = new DBInputSplit(split.getStart(), split.getEnd());
     }
-  }
-
-  /** Returns the query for getting the total number of rows, 
-   * subclasses can override this for custom behaviour.*/
-  protected String getCountQuery() {
-    
-    if(dbConf.getInputCountQuery() != null) {
-      return dbConf.getInputCountQuery();
-    }
-    
-    StringBuilder query = new StringBuilder();
-    query.append("SELECT COUNT(*) FROM " + tableName);
-
-    if (conditions != null && conditions.length() > 0)
-      query.append(" WHERE " + conditions);
-    return query.toString();
+    return ret;
   }
 
   /**

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java Tue Jun 16 20:54:24 2009
@@ -23,120 +23,41 @@
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.StringUtils;
 
 /**
- * A OutputFormat that sends the reduce output to a SQL table.
- * <p> 
- * {@link DBOutputFormat} accepts &lt;key,value&gt; pairs, where 
- * key has a type extending DBWritable. Returned {@link RecordWriter} 
- * writes <b>only the key</b> to the database with a batch SQL query.  
- * 
+ * @deprecated Use org.apache.hadoop.mapreduce.lib.db.DBOutputFormat instead
  */
+@Deprecated
 public class DBOutputFormat<K  extends DBWritable, V> 
-implements OutputFormat<K,V> {
-
-  private static final Log LOG = LogFactory.getLog(DBOutputFormat.class);
+    extends org.apache.hadoop.mapreduce.lib.db.DBOutputFormat<K, V>
+    implements OutputFormat<K, V> {
 
   /**
    * A RecordWriter that writes the reduce output to a SQL table
    */
-  protected class DBRecordWriter 
-  implements RecordWriter<K, V> {
-
-    private Connection connection;
-    private PreparedStatement statement;
-
-    protected DBRecordWriter(Connection connection
-        , PreparedStatement statement) throws SQLException {
-      this.connection = connection;
-      this.statement = statement;
-      this.connection.setAutoCommit(false);
+  protected class DBRecordWriter extends 
+      org.apache.hadoop.mapreduce.lib.db.DBOutputFormat<K, V>.DBRecordWriter
+      implements RecordWriter<K, V> {
+
+    protected DBRecordWriter(Connection connection, 
+      PreparedStatement statement) throws SQLException {
+      super(connection, statement);
     }
 
     /** {@inheritDoc} */
     public void close(Reporter reporter) throws IOException {
-      try {
-        statement.executeBatch();
-        connection.commit();
-      } catch (SQLException e) {
-        try {
-          connection.rollback();
-        }
-        catch (SQLException ex) {
-          LOG.warn(StringUtils.stringifyException(ex));
-        }
-        throw new IOException(e.getMessage());
-      } finally {
-        try {
-          statement.close();
-          connection.close();
-        }
-        catch (SQLException ex) {
-          throw new IOException(ex.getMessage());
-        }
-      }
-    }
-
-    /** {@inheritDoc} */
-    public void write(K key, V value) throws IOException {
-      try {
-        key.write(statement);
-        statement.addBatch();
-      } catch (SQLException e) {
-        e.printStackTrace();
-      }
+      super.close(null);
     }
   }
 
-  /**
-   * Constructs the query used as the prepared statement to insert data.
-   * 
-   * @param table
-   *          the table to insert into
-   * @param fieldNames
-   *          the fields to insert into. If field names are unknown, supply an
-   *          array of nulls.
-   */
-  protected String constructQuery(String table, String[] fieldNames) {
-    if(fieldNames == null) {
-      throw new IllegalArgumentException("Field names may not be null");
-    }
-
-    StringBuilder query = new StringBuilder();
-    query.append("INSERT INTO ").append(table);
-
-    if (fieldNames.length > 0 && fieldNames[0] != null) {
-      query.append(" (");
-      for (int i = 0; i < fieldNames.length; i++) {
-        query.append(fieldNames[i]);
-        if (i != fieldNames.length - 1) {
-          query.append(",");
-        }
-      }
-      query.append(")");
-    }
-    query.append(" VALUES (");
-
-    for (int i = 0; i < fieldNames.length; i++) {
-      query.append("?");
-      if(i != fieldNames.length - 1) {
-        query.append(",");
-      }
-    }
-    query.append(");");
-
-    return query.toString();
-  }
-
   /** {@inheritDoc} */
   public void checkOutputSpecs(FileSystem filesystem, JobConf job)
   throws IOException {
@@ -146,24 +67,15 @@
   /** {@inheritDoc} */
   public RecordWriter<K, V> getRecordWriter(FileSystem filesystem,
       JobConf job, String name, Progressable progress) throws IOException {
-
-    DBConfiguration dbConf = new DBConfiguration(job);
-    String tableName = dbConf.getOutputTableName();
-    String[] fieldNames = dbConf.getOutputFieldNames();
-    
-    if(fieldNames == null) {
-      fieldNames = new String[dbConf.getOutputFieldCount()];
-    }
-    
+    org.apache.hadoop.mapreduce.RecordWriter<K, V> w = super.getRecordWriter(
+      new TaskAttemptContext(job, 
+            TaskAttemptID.forName(job.get("mapred.task.id"))));
+    org.apache.hadoop.mapreduce.lib.db.DBOutputFormat.DBRecordWriter writer = 
+     (org.apache.hadoop.mapreduce.lib.db.DBOutputFormat.DBRecordWriter) w;
     try {
-      Connection connection = dbConf.getConnection();
-      PreparedStatement statement = null;
-  
-      statement = connection.prepareStatement(constructQuery(tableName, fieldNames));
-      return new DBRecordWriter(connection, statement);
-    }
-    catch (Exception ex) {
-      throw new IOException(ex.getMessage());
+      return new DBRecordWriter(writer.getConnection(), writer.getStatement());
+    } catch(SQLException se) {
+      throw new IOException(se);
     }
   }
 

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/lib/db/DBWritable.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/lib/db/DBWritable.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/lib/db/DBWritable.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/lib/db/DBWritable.java Tue Jun 16 20:54:24 2009
@@ -1,75 +1,11 @@
 package org.apache.hadoop.mapred.lib.db;
 
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-
-import org.apache.hadoop.io.Writable;
-
 /**
- * Objects that are read from/written to a database should implement
- * <code>DBWritable</code>. DBWritable, is similar to {@link Writable} 
- * except that the {@link #write(PreparedStatement)} method takes a 
- * {@link PreparedStatement}, and {@link #readFields(ResultSet)} 
- * takes a {@link ResultSet}. 
- * <p>
- * Implementations are responsible for writing the fields of the object 
- * to PreparedStatement, and reading the fields of the object from the 
- * ResultSet. 
- * 
- * <p>Example:</p>
- * If we have the following table in the database :
- * <pre>
- * CREATE TABLE MyTable (
- *   counter        INTEGER NOT NULL,
- *   timestamp      BIGINT  NOT NULL,
- * );
- * </pre>
- * then we can read/write the tuples from/to the table with :
- * <p><pre>
- * public class MyWritable implements Writable, DBWritable {
- *   // Some data     
- *   private int counter;
- *   private long timestamp;
- *       
- *   //Writable#write() implementation
- *   public void write(DataOutput out) throws IOException {
- *     out.writeInt(counter);
- *     out.writeLong(timestamp);
- *   }
- *       
- *   //Writable#readFields() implementation
- *   public void readFields(DataInput in) throws IOException {
- *     counter = in.readInt();
- *     timestamp = in.readLong();
- *   }
- *       
- *   public void write(PreparedStatement statement) throws SQLException {
- *     statement.setInt(1, counter);
- *     statement.setLong(2, timestamp);
- *   }
- *       
- *   public void readFields(ResultSet resultSet) throws SQLException {
- *     counter = resultSet.getInt(1);
- *     timestamp = resultSet.getLong(2);
- *   } 
- * }
- * </pre></p>
+ * @deprecated 
+ * Use {@link org.apache.hadoop.mapreduce.lib.db.DBWritable} instead
  */
-public interface DBWritable {
-
-  /**
-   * Sets the fields of the object in the {@link PreparedStatement}.
-   * @param statement the statement that the fields are put into.
-   * @throws SQLException
-   */
-	public void write(PreparedStatement statement) throws SQLException;
-	
-	/**
-	 * Reads the fields of the object from the {@link ResultSet}. 
-	 * @param resultSet the {@link ResultSet} to get the fields from.
-	 * @throws SQLException
-	 */
-	public void readFields(ResultSet resultSet) throws SQLException ; 
+@Deprecated
+public interface DBWritable 
+    extends org.apache.hadoop.mapreduce.lib.db.DBWritable {
 	
 }

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapreduce/TaskCounter.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapreduce/TaskCounter.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapreduce/TaskCounter.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapreduce/TaskCounter.java Tue Jun 16 20:54:24 2009
@@ -23,7 +23,6 @@
   MAP_INPUT_RECORDS, 
   MAP_OUTPUT_RECORDS,
   MAP_SKIPPED_RECORDS,
-  MAP_INPUT_BYTES, 
   MAP_OUTPUT_BYTES,
   COMBINE_INPUT_RECORDS,
   COMBINE_OUTPUT_RECORDS,

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties Tue Jun 16 20:54:24 2009
@@ -3,7 +3,6 @@
 CounterGroupName=              Map-Reduce Framework
 
 MAP_INPUT_RECORDS.name=        Map input records
-MAP_INPUT_BYTES.name=          Map input bytes
 MAP_OUTPUT_RECORDS.name=       Map output records
 MAP_OUTPUT_BYTES.name=         Map output bytes
 MAP_SKIPPED_RECORDS.name=      Map skipped records

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Tue Jun 16 20:54:24 2009
@@ -49,6 +49,9 @@
  * not split-up and are processed as a whole by {@link Mapper}s.
  */
 public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
+  public static final String COUNTER_GROUP = 
+                                "FileInputFormatCounters";
+  public static final String BYTES_READ = "BYTES_READ";
 
   private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
 

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java Tue Jun 16 20:54:24 2009
@@ -28,9 +28,11 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.MapContext;
 import org.apache.hadoop.util.LineReader;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.Log;
@@ -49,10 +51,13 @@
   private int maxLineLength;
   private LongWritable key = null;
   private Text value = null;
+  private Counter inputByteCounter;
 
   public void initialize(InputSplit genericSplit,
                          TaskAttemptContext context) throws IOException {
     FileSplit split = (FileSplit) genericSplit;
+    inputByteCounter = ((MapContext)context).getCounter(
+      FileInputFormat.COUNTER_GROUP, FileInputFormat.BYTES_READ);
     Configuration job = context.getConfiguration();
     this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
                                     Integer.MAX_VALUE);
@@ -101,6 +106,7 @@
         break;
       }
       pos += newSize;
+      inputByteCounter.increment(newSize);
       if (newSize < maxLineLength) {
         break;
       }

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileRecordReader.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileRecordReader.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileRecordReader.java Tue Jun 16 20:54:24 2009
@@ -25,7 +25,9 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.MapContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
@@ -38,12 +40,16 @@
   private K key = null;
   private V value = null;
   protected Configuration conf;
+  private Counter inputByteCounter;
+  private long pos;
   
   @Override
   public void initialize(InputSplit split, 
                          TaskAttemptContext context
                          ) throws IOException, InterruptedException {
     FileSplit fileSplit = (FileSplit) split;
+    inputByteCounter = ((MapContext)context).getCounter(
+      FileInputFormat.COUNTER_GROUP, FileInputFormat.BYTES_READ);
     conf = context.getConfiguration();    
     Path path = fileSplit.getPath();
     FileSystem fs = path.getFileSystem(conf);
@@ -64,7 +70,8 @@
     if (!more) {
       return false;
     }
-    long pos = in.getPosition();
+    inputByteCounter.increment(in.getPosition()-pos);
+    pos = in.getPosition();
     key = (K) in.next(key);
     if (key == null || (pos >= end && in.syncSeen())) {
       more = false;

Propchange: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Tue Jun 16 20:54:24 2009
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/test/mapred:713112
+/hadoop/core/trunk/src/test/mapred:776175-784663

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/ControlledMapReduceJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/ControlledMapReduceJob.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/ControlledMapReduceJob.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/ControlledMapReduceJob.java Tue Jun 16 20:54:24 2009
@@ -416,6 +416,8 @@
     conf.setInputFormat(ControlledMapReduceJob.class);
     FileInputFormat.addInputPath(conf, new Path("ignored"));
     conf.setOutputFormat(NullOutputFormat.class);
+    conf.setMapSpeculativeExecution(false);
+    conf.setReduceSpeculativeExecution(false);
 
     // Set the following for reduce tasks to be able to be started running
     // immediately along with maps.
@@ -573,4 +575,4 @@
       return new ControlledMapReduceJobRunner(conf, numMappers, numReducers);
     }
   }
-}
\ No newline at end of file
+}

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/SortValidator.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/SortValidator.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/SortValidator.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/SortValidator.java Tue Jun 16 20:54:24 2009
@@ -210,9 +210,9 @@
           try {
             URI inputURI = new URI(job.get("map.input.file"));
             String inputFile = inputURI.getPath();
-            partition = Integer.valueOf(
-                                        inputFile.substring(inputFile.lastIndexOf("part")+5)
-                                        ).intValue();
+            // part file is of the form part-r-xxxxx
+            partition = Integer.valueOf(inputFile.substring(
+              inputFile.lastIndexOf("part") + 7)).intValue();
             noSortReducers = job.getInt("sortvalidate.sort.reduce.tasks", -1);
           } catch (Exception e) {
             System.err.println("Caught: " + e);

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestCounters.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestCounters.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestCounters.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestCounters.java Tue Jun 16 20:54:24 2009
@@ -70,7 +70,7 @@
   }
   
   public void testCounters() throws IOException {
-    Enum[] keysWithResource = {TaskCounter.MAP_INPUT_BYTES, 
+    Enum[] keysWithResource = {TaskCounter.MAP_INPUT_RECORDS, 
                                TaskCounter.MAP_OUTPUT_BYTES};
     
     Enum[] keysWithoutResource = {myCounters.TEST1, myCounters.TEST2};

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputFormat.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputFormat.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputFormat.java Tue Jun 16 20:54:24 2009
@@ -32,7 +32,7 @@
 public class TestFileOutputFormat extends HadoopTestCase {
 
   public TestFileOutputFormat() throws IOException {
-    super(HadoopTestCase.CLUSTER_MR, HadoopTestCase.LOCAL_FS, 1, 1);
+    super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1);
   }
 
   public void testCustomFile() throws Exception {

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java Tue Jun 16 20:54:24 2009
@@ -18,6 +18,7 @@
 package org.apache.hadoop.mapred;
 
 import java.io.BufferedReader;
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
@@ -27,14 +28,25 @@
 import java.util.Iterator;
 import java.util.StringTokenizer;
 
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.serializer.JavaSerializationComparator;
 
-public class TestJavaSerialization extends ClusterMapReduceTestCase {
-  
+public class TestJavaSerialization extends TestCase {
+
+  private static String TEST_ROOT_DIR =
+    new File(System.getProperty("test.build.data", "/tmp")).toURI()
+    .toString().replace(' ', '+');
+
+  private final Path INPUT_DIR = new Path(TEST_ROOT_DIR + "/input");
+  private final Path OUTPUT_DIR = new Path(TEST_ROOT_DIR + "/out");
+  private final Path INPUT_FILE = new Path(INPUT_DIR , "inp");
+
   static class WordCountMapper extends MapReduceBase implements
       Mapper<LongWritable, Text, String, Long> {
 
@@ -64,17 +76,26 @@
     }
     
   }
-  
-  public void testMapReduceJob() throws Exception {
-    OutputStream os = getFileSystem().create(new Path(getInputDir(),
-        "text.txt"));
+
+  private void cleanAndCreateInput(FileSystem fs) throws IOException {
+    fs.delete(INPUT_FILE, true);
+    fs.delete(OUTPUT_DIR, true);
+
+    OutputStream os = fs.create(INPUT_FILE);
+
     Writer wr = new OutputStreamWriter(os);
     wr.write("b a\n");
     wr.close();
+  }
+  
+  public void testMapReduceJob() throws Exception {
 
-    JobConf conf = createJobConf();
+    JobConf conf = new JobConf(TestJavaSerialization.class);
     conf.setJobName("JavaSerialization");
     
+    FileSystem fs = FileSystem.get(conf);
+    cleanAndCreateInput(fs);
+
     conf.set("io.serializations",
     "org.apache.hadoop.io.serializer.JavaSerialization," +
     "org.apache.hadoop.io.serializer.WritableSerialization");
@@ -88,17 +109,16 @@
     conf.setMapperClass(WordCountMapper.class);
     conf.setReducerClass(SumReducer.class);
 
-    FileInputFormat.setInputPaths(conf, getInputDir());
+    FileInputFormat.setInputPaths(conf, INPUT_DIR);
 
-    FileOutputFormat.setOutputPath(conf, getOutputDir());
+    FileOutputFormat.setOutputPath(conf, OUTPUT_DIR);
 
     JobClient.runJob(conf);
 
     Path[] outputFiles = FileUtil.stat2Paths(
-                           getFileSystem().listStatus(getOutputDir(),
-                           new OutputLogFilter()));
+        fs.listStatus(OUTPUT_DIR, new OutputLogFilter()));
     assertEquals(1, outputFiles.length);
-    InputStream is = getFileSystem().open(outputFiles[0]);
+    InputStream is = fs.open(outputFiles[0]);
     BufferedReader reader = new BufferedReader(new InputStreamReader(is));
     assertEquals("a\t1", reader.readLine());
     assertEquals("b\t1", reader.readLine());
@@ -108,27 +128,25 @@
 
   /**
    * HADOOP-4466:
-   * This test verifies the JavSerialization impl can write to SequenceFiles. by virtue other
-   * SequenceFileOutputFormat is not coupled to Writable types, if so, the job will fail.
+   * This test verifies the JavSerialization impl can write to
+   * SequenceFiles. by virtue other SequenceFileOutputFormat is not 
+   * coupled to Writable types, if so, the job will fail.
    *
    */
   public void testWriteToSequencefile() throws Exception {
-    OutputStream os = getFileSystem().create(new Path(getInputDir(),
-        "text.txt"));
-    Writer wr = new OutputStreamWriter(os);
-    wr.write("b a\n");
-    wr.close();
-
-    JobConf conf = createJobConf();
+    JobConf conf = new JobConf(TestJavaSerialization.class);
     conf.setJobName("JavaSerialization");
 
+    FileSystem fs = FileSystem.get(conf);
+    cleanAndCreateInput(fs);
+
     conf.set("io.serializations",
     "org.apache.hadoop.io.serializer.JavaSerialization," +
     "org.apache.hadoop.io.serializer.WritableSerialization");
 
     conf.setInputFormat(TextInputFormat.class);
-    conf.setOutputFormat(SequenceFileOutputFormat.class); // test we can write to sequence files
-
+    // test we can write to sequence files
+    conf.setOutputFormat(SequenceFileOutputFormat.class); 
     conf.setOutputKeyClass(String.class);
     conf.setOutputValueClass(Long.class);
     conf.setOutputKeyComparatorClass(JavaSerializationComparator.class);
@@ -136,16 +154,15 @@
     conf.setMapperClass(WordCountMapper.class);
     conf.setReducerClass(SumReducer.class);
 
-    FileInputFormat.setInputPaths(conf, getInputDir());
+    FileInputFormat.setInputPaths(conf, INPUT_DIR);
 
-    FileOutputFormat.setOutputPath(conf, getOutputDir());
+    FileOutputFormat.setOutputPath(conf, OUTPUT_DIR);
 
     JobClient.runJob(conf);
 
     Path[] outputFiles = FileUtil.stat2Paths(
-                           getFileSystem().listStatus(getOutputDir(),
-                           new OutputLogFilter()));
+        fs.listStatus(OUTPUT_DIR, new OutputLogFilter()));
     assertEquals(1, outputFiles.length);
-}
+  }
 
 }

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java Tue Jun 16 20:54:24 2009
@@ -37,7 +37,7 @@
   //end of the job (indirectly testing whether all tasktrackers
   //got a KillJobAction).
   private static final Log LOG =
-        LogFactory.getLog(TestEmptyJobWithDFS.class.getName());
+        LogFactory.getLog(TestEmptyJob.class.getName());
   private void runSleepJob(JobConf conf) throws Exception {
     String[] args = { "-m", "1", "-r", "10", "-mt", "1000", "-rt", "10000" };
     ToolRunner.run(conf, new SleepJob(), args);

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java Tue Jun 16 20:54:24 2009
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 /**
@@ -41,4 +44,28 @@
     assertTrue("Job failed", job.isSuccessful());
     assertOwnerShip(outDir);
   }
+  
+  public void testEnvironment() throws IOException {
+    if (!shouldRun()) {
+      return;
+    }
+    startCluster();
+    TestMiniMRChildTask childTask = new TestMiniMRChildTask();
+    Path inDir = new Path("input1");
+    Path outDir = new Path("output1");
+    try {
+      childTask.runTestTaskEnv(getClusterConf(), inDir, outDir);
+    } catch (IOException e) {
+      fail("IOException thrown while running enviroment test."
+          + e.getMessage());
+    } finally {
+      FileSystem outFs = outDir.getFileSystem(getClusterConf());
+      if (outFs.exists(outDir)) {
+        assertOwnerShip(outDir);
+        outFs.delete(outDir, true);
+      } else {
+        fail("Output directory does not exist" + outDir.toString());
+      }
+    }
+  }
 }

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java Tue Jun 16 20:54:24 2009
@@ -373,8 +373,8 @@
                    (status.equals("SUCCESS") || status.equals("FAILED") ||
                     status.equals("KILLED")));
 
-        // Reduce Task Attempts should have valid SHUFFLE_FINISHED time and
-        // SORT_FINISHED time
+        // Successful Reduce Task Attempts should have valid SHUFFLE_FINISHED
+        // time and SORT_FINISHED time
         if (type.equals("REDUCE") && status.equals("SUCCESS")) {
           time1 = attempt.get(Keys.SHUFFLE_FINISHED);
           assertTrue("SHUFFLE_FINISHED time of task attempt " + id +
@@ -389,6 +389,15 @@
           assertTrue("Reduce Task SORT_FINISHED time is < SORT_FINISHED time" +
                      " in history file", areTimesInOrder(time1, time));
         }
+        else if (type.equals("MAP") && status.equals("SUCCESS")) {
+          // Successful MAP Task Attempts should have valid MAP_FINISHED time
+          time1 = attempt.get(Keys.MAP_FINISHED);
+          assertTrue("MAP_FINISHED time of task attempt " + id +
+                     " is in unexpected format:" + time1 +
+                     " in history file", isTimeValid(time1));
+          assertTrue("MAP_FINISHED time of map task is < START_TIME " +
+                     "in history file", areTimesInOrder(time, time1));
+        }
 
         // check if hostname is valid
         String hostname = attempt.get(Keys.HOSTNAME);

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java Tue Jun 16 20:54:24 2009
@@ -34,11 +34,11 @@
  * recover previosuly submitted jobs.
  */
 public class TestJobTrackerRestart extends TestCase {
-  final Path testDir = 
+  static final Path testDir = 
     new Path(System.getProperty("test.build.data","/tmp"), 
              "jt-restart-testing");
   final Path inDir = new Path(testDir, "input");
-  final Path shareDir = new Path(testDir, "share");
+  static final Path shareDir = new Path(testDir, "share");
   final Path outputDir = new Path(testDir, "output");
   private static int numJobsSubmitted = 0;
   
@@ -400,6 +400,115 @@
            && status.getReduceTasks() == 0;
   }
   
+  /** Committer with setup waiting
+   */
+  static class CommitterWithDelaySetup extends FileOutputCommitter {
+    @Override
+    public void setupJob(JobContext context) throws IOException {
+      FileSystem fs = FileSystem.get(context.getConfiguration());
+      while (true) {
+        if (fs.exists(shareDir)) {
+          break;
+        }
+        UtilsForTests.waitFor(100);
+      }
+      super.cleanupJob(context);
+    }
+  }
+
+  /** Tests a job on jobtracker with restart-recovery turned on and empty 
+   *  jobhistory file.
+   * Preparation :
+   *    - Configure a job with
+   *       - num-maps : 0 (long waiting setup)
+   *       - num-reducers : 0
+   *    
+   * Check if the job succeedes after restart.
+   * 
+   * Assumption that map slots are given first for setup.
+   */
+  public void testJobRecoveryWithEmptyHistory(MiniDFSCluster dfs, 
+                                              MiniMRCluster mr) 
+  throws IOException {
+    mr.startTaskTracker(null, null, 1, 1);
+    FileSystem fileSys = dfs.getFileSystem();
+    
+    cleanUp(fileSys, shareDir);
+    cleanUp(fileSys, inDir);
+    cleanUp(fileSys, outputDir);
+    
+    JobConf conf = mr.createJobConf();
+    conf.setNumReduceTasks(0);
+    conf.setOutputCommitter(TestEmptyJob.CommitterWithDelayCleanup.class);
+    fileSys.delete(outputDir, false);
+    RunningJob job1 = 
+      UtilsForTests.runJob(conf, inDir, outputDir, 30, 0);
+    
+    conf.setNumReduceTasks(0);
+    conf.setOutputCommitter(CommitterWithDelaySetup.class);
+    Path inDir2 = new Path(testDir, "input2");
+    fileSys.mkdirs(inDir2);
+    Path outDir2 = new Path(testDir, "output2");
+    fileSys.delete(outDir2, false);
+    JobConf newConf = getJobs(mr.createJobConf(),
+                              new JobPriority[] {JobPriority.NORMAL},
+                              new int[] {10}, new int[] {0},
+                              outDir2, inDir2,
+                              getMapSignalFile(shareDir),
+                              getReduceSignalFile(shareDir))[0];
+
+    JobClient jobClient = new JobClient(newConf);
+    RunningJob job2 = jobClient.submitJob(newConf);
+    JobID id = job2.getID();
+
+    /*RunningJob job2 = 
+      UtilsForTests.runJob(mr.createJobConf(), inDir2, outDir2, 0);
+    
+    JobID id = job2.getID();*/
+    JobInProgress jip = mr.getJobTrackerRunner().getJobTracker().getJob(id);
+    
+    jip.initTasks();
+    
+    // find out the history filename
+    String history = 
+      JobHistory.JobInfo.getJobHistoryFileName(jip.getJobConf(), id);
+    Path historyPath = JobHistory.JobInfo.getJobHistoryLogLocation(history);
+    
+    //  make sure that setup is launched
+    while (jip.runningMaps() == 0) {
+      UtilsForTests.waitFor(100);
+    }
+    
+    id = job1.getID();
+    jip = mr.getJobTrackerRunner().getJobTracker().getJob(id);
+    
+    jip.initTasks();
+    
+    //  make sure that cleanup is launched and is waiting
+    while (!jip.isCleanupLaunched()) {
+      UtilsForTests.waitFor(100);
+    }
+    
+    mr.stopJobTracker();
+    
+    // delete the history file .. just to be safe.
+    FileSystem historyFS = historyPath.getFileSystem(conf);
+    historyFS.delete(historyPath, false);
+    historyFS.create(historyPath).close(); // create an empty file
+    
+    
+    UtilsForTests.signalTasks(dfs, fileSys, getMapSignalFile(shareDir), getReduceSignalFile(shareDir), (short)1);
+
+    // Turn on the recovery
+    mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", 
+                                      true);
+    
+    mr.startJobTracker();
+    
+    job1.waitForCompletion();
+    job2.waitForCompletion();
+  }
+  
   public void testJobTrackerRestart() throws IOException {
     String namenode = null;
     MiniDFSCluster dfs = null;
@@ -450,6 +559,9 @@
       
       // Test jobtracker with restart-recovery turned off
       testRestartWithoutRecovery(dfs, mr);
+      
+      // test recovery with empty file
+      testJobRecoveryWithEmptyHistory(dfs, mr);
     } finally {
       if (mr != null) {
         try {



Mime
View raw message