hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r566798 [1/3] - in /lucene/hadoop/trunk: ./ src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/examples/org/apache/hadoop/examples/ src/examples/org/apache/hadoop/examp...
Date Thu, 16 Aug 2007 18:45:56 GMT
Author: cutting
Date: Thu Aug 16 11:45:49 2007
New Revision: 566798

URL: http://svn.apache.org/viewvc?view=rev&rev=566798
Log:
HADOOP-1231.  Add generics to Mapper and Reducer interfaces.  Contributed by Tom White.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java
    lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/AggregateWordCount.java
    lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java
    lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java
    lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java
    lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/dancing/DistributedPentomino.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapRunnable.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Mapper.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputCollector.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Partitioner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordReader.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordWriter.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reducer.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFormat.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/FieldSelectionMapReduce.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/HashPartitioner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/IdentityMapper.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/IdentityReducer.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/InverseMapper.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/KeyFieldBasedPartitioner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/LongSumReducer.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/NullOutputFormat.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/RegexMapper.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/TokenCountMapper.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/UserDefinedValueAggregatorDescriptor.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorBaseDescriptor.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorCombiner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorDescriptor.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorJobBase.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorMapper.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/ValueAggregatorReducer.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/OutputHandler.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesPartitioner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/UpwardProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/tools/Logalyzer.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
    lucene/hadoop/trunk/src/test/checkstyle.xml
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/AccumulatingReducer.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/IOMapperBase.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRBench.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/SortValidator.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestComparators.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapOutputType.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsTextInputFormat.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/lib/aggregate/AggregatorTests.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/pipes/WordCountInputFormat.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/record/TestRecordMR.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/record/TestRecordWritable.java
    lucene/hadoop/trunk/src/test/testjar/ClassWordCount.java
    lucene/hadoop/trunk/src/test/testjar/ExternalMapperReducer.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Aug 16 11:45:49 2007
@@ -53,6 +53,9 @@
     HADOOP-1693.  Remove un-needed log fields in DFS replication classes,
     since the log may be accessed statically. (Konstantin Shvachko via cutting)
 
+    HADOOP-1231.  Add generics to Mapper and Reducer interfaces.
+    (tomwhite via cutting)
+
 
 Release 0.14.0 - 2007-08-17
 

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java Thu Aug 16 11:45:49 2007
@@ -48,7 +48,9 @@
 /**
  * Convert HBase tabular data into a format that is consumable by Map/Reduce
  */
-public class TableInputFormat implements InputFormat, JobConfigurable {
+public class TableInputFormat
+  implements InputFormat<HStoreKey, KeyedDataArrayWritable>, JobConfigurable {
+  
   static final Logger LOG = Logger.getLogger(TableInputFormat.class.getName());
 
   /**
@@ -64,7 +66,7 @@
   /**
    * Iterate over an HBase table data, return (HStoreKey, KeyedDataArrayWritable) pairs
    */
-  class TableRecordReader implements RecordReader {
+  class TableRecordReader implements RecordReader<HStoreKey, KeyedDataArrayWritable> {
     private HScannerInterface m_scanner;
     private TreeMap<Text, byte[]> m_row; // current buffer
     private Text m_endRow;
@@ -95,7 +97,7 @@
      *
      * @see org.apache.hadoop.mapred.RecordReader#createKey()
      */
-    public WritableComparable createKey() {
+    public HStoreKey createKey() {
       return new HStoreKey();
     }
 
@@ -104,7 +106,7 @@
      *
      * @see org.apache.hadoop.mapred.RecordReader#createValue()
      */
-    public Writable createValue() {
+    public KeyedDataArrayWritable createValue() {
       return new KeyedDataArrayWritable();
     }
 
@@ -130,17 +132,17 @@
      * @return true if there was more data
      * @throws IOException
      */
-    public boolean next(Writable key, Writable value) throws IOException {
+    public boolean next(HStoreKey key, KeyedDataArrayWritable value) throws IOException {
       LOG.debug("start next");
       m_row.clear();
-      HStoreKey tKey = (HStoreKey)key;
+      HStoreKey tKey = key;
       boolean hasMore = m_scanner.next(tKey, m_row);
 
       if(hasMore) {
         if(m_endRow.getLength() > 0 && (tKey.getRow().compareTo(m_endRow) < 0)) {
           hasMore = false;
         } else {
-          KeyedDataArrayWritable rowVal = (KeyedDataArrayWritable) value;
+          KeyedDataArrayWritable rowVal = value;
           ArrayList<KeyedData> columns = new ArrayList<KeyedData>();
 
           for(Map.Entry<Text, byte[]> e: m_row.entrySet()) {
@@ -159,8 +161,8 @@
 
   }
 
-  /** {@inheritDoc} */
-  public RecordReader getRecordReader(InputSplit split,
+  public RecordReader<HStoreKey, KeyedDataArrayWritable> getRecordReader(
+      InputSplit split,
       @SuppressWarnings("unused") JobConf job,
       @SuppressWarnings("unused") Reporter reporter) throws IOException {
     

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java Thu Aug 16 11:45:49 2007
@@ -42,7 +42,8 @@
 /**
  * Convert Map/Reduce output and write it to an HBase table
  */
-public class TableOutputFormat extends OutputFormatBase {
+public class TableOutputFormat
+  extends OutputFormatBase<Text, KeyedDataArrayWritable> {
 
   /** JobConf parameter that specifies the output table */
   public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
@@ -56,7 +57,9 @@
    * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable) 
    * and write to an HBase table
    */
-  protected class TableRecordWriter implements RecordWriter {
+  protected class TableRecordWriter
+    implements RecordWriter<Text, KeyedDataArrayWritable> {
+    
     private HTable m_table;
 
     /**
@@ -77,10 +80,10 @@
      *
      * @see org.apache.hadoop.mapred.RecordWriter#write(org.apache.hadoop.io.WritableComparable, org.apache.hadoop.io.Writable)
      */
-    public void write(WritableComparable key, Writable value) throws IOException {
+    public void write(Text key, KeyedDataArrayWritable value) throws IOException {
       LOG.debug("start write");
-      Text tKey = (Text)key;
-      KeyedDataArrayWritable tValue = (KeyedDataArrayWritable) value;
+      Text tKey = key;
+      KeyedDataArrayWritable tValue = value;
       KeyedData[] columns = tValue.get();
 
       // start transaction

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java Thu Aug 16 11:45:49 2007
@@ -39,7 +39,7 @@
  * @see StreamLineRecordReader
  * @see StreamXmlRecordReader 
  */
-public abstract class StreamBaseRecordReader implements RecordReader {
+public abstract class StreamBaseRecordReader implements RecordReader<Text, Text> {
 
   protected static final Log LOG = LogFactory.getLog(StreamBaseRecordReader.class.getName());
 
@@ -65,7 +65,7 @@
 
   /** Read a record. Implementation should call numRecStats at the end
    */
-  public abstract boolean next(Writable key, Writable value) throws IOException;
+  public abstract boolean next(Text key, Text value) throws IOException;
 
   /** This implementation always returns true. */
   public void validateInput(JobConf job) throws IOException {
@@ -89,11 +89,11 @@
     }
   }
   
-  public WritableComparable createKey() {
+  public Text createKey() {
     return new Text();
   }
 
-  public Writable createValue() {
+  public Text createValue() {
     return new Text();
   }
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java Thu Aug 16 11:45:49 2007
@@ -23,6 +23,7 @@
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.Text;
 
 import org.apache.hadoop.mapred.*;
 
@@ -33,7 +34,8 @@
  */
 public class StreamInputFormat extends KeyValueTextInputFormat {
 
-  public RecordReader getRecordReader(final InputSplit genericSplit,
+  @SuppressWarnings("unchecked")
+  public RecordReader<Text, Text> getRecordReader(final InputSplit genericSplit,
                                       JobConf job, Reporter reporter) throws IOException {
     String c = job.get("stream.recordreader.class");
     if (c == null || c.indexOf("LineRecordReader") >= 0) {
@@ -67,9 +69,9 @@
       throw new RuntimeException(nsm);
     }
 
-    RecordReader reader;
+    RecordReader<Text, Text> reader;
     try {
-      reader = (RecordReader) ctor.newInstance(new Object[] { in, split,
+      reader = (RecordReader<Text, Text>) ctor.newInstance(new Object[] { in, split,
                                                               reporter, job, fs });
     } catch (Exception nsm) {
       throw new RuntimeException(nsm);

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java Thu Aug 16 11:45:49 2007
@@ -24,6 +24,7 @@
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.mapred.Reporter;
@@ -75,7 +76,7 @@
   
   int numNext = 0;
 
-  public synchronized boolean next(Writable key, Writable value) throws IOException {
+  public synchronized boolean next(Text key, Text value) throws IOException {
     long pos = in_.getPos();
     numNext++;
     if (pos >= end_) {
@@ -96,8 +97,8 @@
 
     numRecStats(record, 0, record.length);
 
-    ((Text) key).set(record);
-    ((Text) value).set("");
+    key.set(record);
+    value.set("");
 
     /*if (numNext < 5) {
       System.out.println("@@@ " + numNext + ". true next k=|" + key.toString().replaceAll("[\\r\\n]", " ")

Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/AggregateWordCount.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/AggregateWordCount.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/AggregateWordCount.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/AggregateWordCount.java Thu Aug 16 11:45:49 2007
@@ -27,7 +27,6 @@
 import java.io.IOException;
 
 import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.lib.aggregate.*;
 
 /**
@@ -43,13 +42,14 @@
 
   public static class WordCountPlugInClass extends
       ValueAggregatorBaseDescriptor {
-    public ArrayList<Entry> generateKeyValPairs(Object key, Object val) {
+    public ArrayList<Entry<Text, Text>> generateKeyValPairs(Object key,
+                                                            Object val) {
       String countType = LONG_VALUE_SUM;
-      ArrayList<Entry> retv = new ArrayList<Entry>();
+      ArrayList<Entry<Text, Text>> retv = new ArrayList<Entry<Text, Text>>();
       String line = val.toString();
       StringTokenizer itr = new StringTokenizer(line);
       while (itr.hasMoreTokens()) {
-        Entry e = generateEntry(countType, itr.nextToken(), ONE);
+        Entry<Text, Text> e = generateEntry(countType, itr.nextToken(), ONE);
         if (e != null) {
           retv.add(e);
         }

Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java Thu Aug 16 11:45:49 2007
@@ -41,7 +41,8 @@
    * Mappper class for Pi estimation.
    */
   
-  public static class PiMapper extends MapReduceBase implements Mapper {
+  public static class PiMapper extends MapReduceBase
+    implements Mapper<LongWritable, Writable, LongWritable, LongWritable> {
     
     /** Mapper configuration.
      *
@@ -60,11 +61,11 @@
      * @param out
      * @param reporter
      */
-    public void map(WritableComparable key,
+    public void map(LongWritable key,
                     Writable val,
-                    OutputCollector out,
+                    OutputCollector<LongWritable, LongWritable> out,
                     Reporter reporter) throws IOException {
-      long nSamples = ((LongWritable) key).get();
+      long nSamples = key.get();
       for(long idx = 0; idx < nSamples; idx++) {
         double x = r.nextDouble();
         double y = r.nextDouble();
@@ -87,7 +88,9 @@
     }
   }
   
-  public static class PiReducer extends MapReduceBase implements Reducer {
+  public static class PiReducer extends MapReduceBase
+    implements Reducer<LongWritable, LongWritable, WritableComparable, Writable> {
+    
     long numInside = 0;
     long numOutside = 0;
     JobConf conf;
@@ -104,18 +107,18 @@
      * @param output
      * @param reporter
      */
-    public void reduce(WritableComparable key,
-                       Iterator values,
-                       OutputCollector output,
+    public void reduce(LongWritable key,
+                       Iterator<LongWritable> values,
+                       OutputCollector<WritableComparable, Writable> output,
                        Reporter reporter) throws IOException {
-      if (((LongWritable)key).get() == 1) {
+      if (key.get() == 1) {
         while (values.hasNext()) {
-          long num = ((LongWritable)values.next()).get();
+          long num = values.next().get();
           numInside += num;
         }
       } else {
         while (values.hasNext()) {
-          long num = ((LongWritable)values.next()).get();
+          long num = values.next().get();
           numOutside += num;
         }
       }

Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java Thu Aug 16 11:45:49 2007
@@ -75,7 +75,7 @@
    * A custom input format that creates virtual inputs of a single string
    * for each map.
    */
-  static class RandomInputFormat implements InputFormat {
+  static class RandomInputFormat implements InputFormat<Text, Text> {
     
     /** Accept all job confs */
     public void validateInput(JobConf job) throws IOException {
@@ -99,23 +99,23 @@
      * Return a single record (filename, "") where the filename is taken from
      * the file split.
      */
-    static class RandomRecordReader implements RecordReader {
+    static class RandomRecordReader implements RecordReader<Text, Text> {
       Path name;
       public RandomRecordReader(Path p) {
         name = p;
       }
-      public boolean next(Writable key, Writable value) {
+      public boolean next(Text key, Text value) {
         if (name != null) {
-          ((Text) key).set(name.getName());
+          key.set(name.getName());
           name = null;
           return true;
         }
         return false;
       }
-      public WritableComparable createKey() {
+      public Text createKey() {
         return new Text();
       }
-      public Writable createValue() {
+      public Text createValue() {
         return new Text();
       }
       public long getPos() {
@@ -127,14 +127,17 @@
       }
     }
 
-    public RecordReader getRecordReader(InputSplit split,
+    public RecordReader<Text, Text> getRecordReader(InputSplit split,
                                         JobConf job, 
                                         Reporter reporter) throws IOException {
       return new RandomRecordReader(((FileSplit) split).getPath());
     }
   }
 
-  static class Map extends MapReduceBase implements Mapper {
+  static class Map extends MapReduceBase
+    implements Mapper<WritableComparable, Writable,
+                      BytesWritable, BytesWritable> {
+    
     private long numBytesToWrite;
     private int minKeySize;
     private int keySizeRange;
@@ -155,7 +158,7 @@
      */
     public void map(WritableComparable key, 
                     Writable value,
-                    OutputCollector output, 
+                    OutputCollector<BytesWritable, BytesWritable> output, 
                     Reporter reporter) throws IOException {
       int itemCount = 0;
       while (numBytesToWrite > 0) {

Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java Thu Aug 16 11:45:49 2007
@@ -23,9 +23,8 @@
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
@@ -50,15 +49,16 @@
    * For each line of input, break the line into words and emit them as
    * (<b>word</b>, <b>1</b>).
    */
-  public static class MapClass extends MapReduceBase implements Mapper {
+  public static class MapClass extends MapReduceBase
+    implements Mapper<LongWritable, Text, Text, IntWritable> {
     
     private final static IntWritable one = new IntWritable(1);
     private Text word = new Text();
     
-    public void map(WritableComparable key, Writable value, 
-                    OutputCollector output, 
+    public void map(LongWritable key, Text value, 
+                    OutputCollector<Text, IntWritable> output, 
                     Reporter reporter) throws IOException {
-      String line = ((Text)value).toString();
+      String line = value.toString();
       StringTokenizer itr = new StringTokenizer(line);
       while (itr.hasMoreTokens()) {
         word.set(itr.nextToken());
@@ -70,14 +70,15 @@
   /**
    * A reducer class that just emits the sum of the input values.
    */
-  public static class Reduce extends MapReduceBase implements Reducer {
+  public static class Reduce extends MapReduceBase
+    implements Reducer<Text, IntWritable, Text, IntWritable> {
     
-    public void reduce(WritableComparable key, Iterator values,
-                       OutputCollector output, 
+    public void reduce(Text key, Iterator<IntWritable> values,
+                       OutputCollector<Text, IntWritable> output, 
                        Reporter reporter) throws IOException {
       int sum = 0;
       while (values.hasNext()) {
-        sum += ((IntWritable) values.next()).get();
+        sum += values.next().get();
       }
       output.collect(key, new IntWritable(sum));
     }

Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/dancing/DistributedPentomino.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/dancing/DistributedPentomino.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/dancing/DistributedPentomino.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/dancing/DistributedPentomino.java Thu Aug 16 11:45:49 2007
@@ -24,7 +24,6 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
@@ -46,13 +45,15 @@
    * the solutions that start with that prefix. The output is the prefix as
    * the key and the solution as the value.
    */
-  public static class PentMap extends MapReduceBase implements Mapper {
+  public static class PentMap extends MapReduceBase
+    implements Mapper<WritableComparable, Text, Text, Text> {
+    
     private int width;
     private int height;
     private int depth;
     private Pentomino pent;
     private Text prefixString;
-    private OutputCollector output;
+    private OutputCollector<Text, Text> output;
     private Reporter reporter;
     
     /**
@@ -81,12 +82,12 @@
      * will be selected for each column in order). Find all solutions with
      * that prefix.
      */
-    public void map(WritableComparable key, Writable value,
-                    OutputCollector output, Reporter reporter
+    public void map(WritableComparable key, Text value,
+                    OutputCollector<Text, Text> output, Reporter reporter
                     ) throws IOException {
       this.output = output;
       this.reporter = reporter;
-      prefixString = (Text) value;
+      prefixString = value;
       StringTokenizer itr = new StringTokenizer(prefixString.toString(), ",");
       int[] prefix = new int[depth];
       int idx = 0;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java Thu Aug 16 11:45:49 2007
@@ -28,12 +28,16 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 
 /** 
  * A base class for {@link InputFormat}. 
  * 
  */
-public abstract class FileInputFormat implements InputFormat {
+public abstract class FileInputFormat<K extends WritableComparable,
+                                      V extends Writable>
+  implements InputFormat<K, V> {
 
   public static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.mapred.FileInputFormat");
@@ -62,7 +66,7 @@
     return true;
   }
   
-  public abstract RecordReader getRecordReader(InputSplit split,
+  public abstract RecordReader<K, V> getRecordReader(InputSplit split,
                                                JobConf job,
                                                Reporter reporter)
     throws IOException;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java Thu Aug 16 11:45:49 2007
@@ -22,12 +22,15 @@
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 
 /** An input data format.  Input files are stored in a {@link FileSystem}.
  * The processing of an input file may be split across multiple machines.
  * Files are processed as sequences of records, implementing {@link
  * RecordReader}.  Files must thus be split on record boundaries. */
-public interface InputFormat {
+public interface InputFormat<K extends WritableComparable,
+                             V extends Writable> {
 
   /**
    * Are the input directories valid? This method is used to test the input
@@ -52,8 +55,8 @@
    * @param job the job that this split belongs to
    * @return a {@link RecordReader}
    */
-  RecordReader getRecordReader(InputSplit split,
-                               JobConf job, 
-                               Reporter reporter) throws IOException;
+  RecordReader<K, V> getRecordReader(InputSplit split,
+                                     JobConf job, 
+                                     Reporter reporter) throws IOException;
 }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java Thu Aug 16 11:45:49 2007
@@ -21,9 +21,8 @@
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 
 /**
  * This class treats a line in the input as a key/value pair separated by a 
@@ -31,23 +30,32 @@
  * under the attribute name key.value.separator.in.input.line. The default
  * separator is the tab character ('\t').
  */
-public class KeyValueLineRecordReader extends LineRecordReader {
+public class KeyValueLineRecordReader implements RecordReader<Text, Text> {
+  
+  private final LineRecordReader lineRecordReader;
 
   private byte separator = (byte) '\t';
 
-  private WritableComparable dummyKey = super.createKey();
+  private LongWritable dummyKey;
 
-  private Text innerValue = (Text) super.createValue();
+  private Text innerValue;
 
   public Class getKeyClass() { return Text.class; }
   
   public Text createKey() {
     return new Text();
   }
+  
+  public Text createValue() {
+    return new Text();
+  }
 
   public KeyValueLineRecordReader(Configuration job, FileSplit split)
     throws IOException {
-    super(job, split);
+    
+    lineRecordReader = new LineRecordReader(job, split);
+    dummyKey = lineRecordReader.createKey();
+    innerValue = lineRecordReader.createValue();
     String sepStr = job.get("key.value.separator.in.input.line", "\t");
     this.separator = (byte) sepStr.charAt(0);
   }
@@ -62,13 +70,13 @@
   }
 
   /** Read key/value pair in a line. */
-  public synchronized boolean next(Writable key, Writable value)
+  public synchronized boolean next(Text key, Text value)
     throws IOException {
-    Text tKey = (Text) key;
-    Text tValue = (Text) value;
+    Text tKey = key;
+    Text tValue = value;
     byte[] line = null;
     int lineLen = -1;
-    if (super.next(dummyKey, innerValue)) {
+    if (lineRecordReader.next(dummyKey, innerValue)) {
       line = innerValue.getBytes();
       lineLen = innerValue.getLength();
     } else {
@@ -91,5 +99,17 @@
       tValue.set(valBytes);
     }
     return true;
+  }
+  
+  public float getProgress() {
+    return lineRecordReader.getProgress();
+  }
+  
+  public synchronized long getPos() throws IOException {
+    return lineRecordReader.getPos();
+  }
+
+  public synchronized void close() throws IOException { 
+    lineRecordReader.close();
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java Thu Aug 16 11:45:49 2007
@@ -20,16 +20,35 @@
 
 import java.io.IOException;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+
 /**
  * An {@link InputFormat} for plain text files. Files are broken into lines.
  * Either linefeed or carriage-return are used to signal end of line. Each line
  * is divided into key and value parts by a separator byte. If no such a byte
  * exists, the key will be the entire line and value will be empty.
  */
-public class KeyValueTextInputFormat extends TextInputFormat {
+public class KeyValueTextInputFormat extends FileInputFormat<Text, Text>
+  implements JobConfigurable {
 
-  public RecordReader getRecordReader(InputSplit genericSplit, JobConf job,
-                                      Reporter reporter) throws IOException {
+  private CompressionCodecFactory compressionCodecs = null;
+  
+  public void configure(JobConf conf) {
+    compressionCodecs = new CompressionCodecFactory(conf);
+  }
+  
+  protected boolean isSplitable(FileSystem fs, Path file) {
+    return compressionCodecs.getCodec(file) == null;
+  }
+  
+  public RecordReader<Text, Text> getRecordReader(InputSplit genericSplit,
+                                                  JobConf job,
+                                                  Reporter reporter)
+    throws IOException {
+    
     reporter.setStatus(genericSplit.toString());
     return new KeyValueLineRecordReader(job, (FileSplit) genericSplit);
   }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java Thu Aug 16 11:45:49 2007
@@ -30,15 +30,13 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 
 /**
  * Treats keys as offset in file and value as line. 
  */
-public class LineRecordReader implements RecordReader {
+public class LineRecordReader implements RecordReader<LongWritable, Text> {
   private CompressionCodecFactory compressionCodecs = null;
   private long start; 
   private long pos;
@@ -100,28 +98,28 @@
     //    readLine(in, null); 
   }
   
-  public WritableComparable createKey() {
+  public LongWritable createKey() {
     return new LongWritable();
   }
   
-  public Writable createValue() {
+  public Text createValue() {
     return new Text();
   }
   
   /** Read a line. */
-  public synchronized boolean next(Writable key, Writable value)
+  public synchronized boolean next(LongWritable key, Text value)
     throws IOException {
     if (pos >= end)
       return false;
 
-    ((LongWritable)key).set(pos);           // key is position
+    key.set(pos);           // key is position
     buffer.reset();
     long bytesRead = readLine();
     if (bytesRead == 0) {
       return false;
     }
     pos += bytesRead;
-    bridge.target = (Text) value;
+    bridge.target = value;
     buffer.writeTo(bridge);
     return true;
   }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java Thu Aug 16 11:45:49 2007
@@ -78,10 +78,11 @@
   }
     
   /** Get an entry from output generated by this class. */
-  public static Writable getEntry(MapFile.Reader[] readers,
-                                  Partitioner partitioner,
-                                  WritableComparable key,
-                                  Writable value) throws IOException {
+  public static <K extends WritableComparable, V extends Writable>
+  Writable getEntry(MapFile.Reader[] readers,
+                                  Partitioner<K, V> partitioner,
+                                  K key,
+                                  V value) throws IOException {
     int part = partitioner.getPartition(key, value, readers.length);
     return readers[part].get(key, value);
   }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapRunnable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapRunnable.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapRunnable.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapRunnable.java Thu Aug 16 11:45:49 2007
@@ -20,13 +20,20 @@
 
 import java.io.IOException;
 
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
 /** Expert: Permits greater control of map processing. For example,
  * implementations might perform multi-threaded, asynchronous mappings. */
-public interface MapRunnable extends JobConfigurable {
+public interface MapRunnable<K1 extends WritableComparable, V1 extends Writable,
+                             K2 extends WritableComparable, V2 extends Writable>
+    extends JobConfigurable {
+  
   /** Called to execute mapping.  Mapping is complete when this returns.
    * @param input the {@link RecordReader} with input key/value pairs.
    * @param output the {@link OutputCollector} for mapped key/value pairs.
    */
-  void run(RecordReader input, OutputCollector output, Reporter reporter)
+  void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
+           Reporter reporter)
     throws IOException;
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapRunner.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapRunner.java Thu Aug 16 11:45:49 2007
@@ -25,21 +25,25 @@
 import org.apache.hadoop.util.ReflectionUtils;
 
 /** Default {@link MapRunnable} implementation.*/
-public class MapRunner implements MapRunnable {
-  private Mapper mapper;
+public class MapRunner<K1 extends WritableComparable, V1 extends Writable,
+                       K2 extends WritableComparable, V2 extends Writable>
+    implements MapRunnable<K1, V1, K2, V2> {
+  
+  private Mapper<K1, V1, K2, V2> mapper;
 
+  @SuppressWarnings("unchecked")
   public void configure(JobConf job) {
     this.mapper = (Mapper)ReflectionUtils.newInstance(job.getMapperClass(),
                                                       job);
   }
 
-  public void run(RecordReader input, OutputCollector output,
+  public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
                   Reporter reporter)
     throws IOException {
     try {
       // allocate key & value instances that are re-used for all entries
-      WritableComparable key = input.createKey();
-      Writable value = input.createValue();
+      K1 key = input.createKey();
+      V1 value = input.createValue();
       
       while (input.next(key, value)) {
         // map pair to output

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Thu Aug 16 11:45:49 2007
@@ -112,6 +112,7 @@
     return instantiatedSplit;
   }
 
+  @SuppressWarnings("unchecked")
   public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException {
 
@@ -163,7 +164,7 @@
           return rawIn.createValue();
         }
          
-        public synchronized boolean next(Writable key, Writable value)
+        public synchronized boolean next(WritableComparable key, Writable value)
           throws IOException {
 
           setProgress(getProgress());
@@ -196,7 +197,9 @@
     done(umbilical);
   }
 
-  interface MapOutputCollector extends OutputCollector {
+  interface MapOutputCollector<K extends WritableComparable,
+                               V extends Writable>
+    extends OutputCollector<K, V> {
 
     public void close() throws IOException;
     
@@ -204,12 +207,15 @@
         
   }
 
-  class DirectMapOutputCollector implements MapOutputCollector {
-
-    private RecordWriter out = null;
+  class DirectMapOutputCollector<K extends WritableComparable,
+                                 V extends Writable>
+    implements MapOutputCollector<K, V> {
+ 
+    private RecordWriter<K, V> out = null;
 
     private Reporter reporter = null;
 
+    @SuppressWarnings("unchecked")
     public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical,
         JobConf job, Reporter reporter) throws IOException {
       this.reporter = reporter;
@@ -231,7 +237,7 @@
       
     }
 
-    public void collect(WritableComparable key, Writable value) throws IOException {
+    public void collect(K key, V value) throws IOException {
       this.out.write(key, value);
     }
     
@@ -315,6 +321,7 @@
       indexOut.writeLong(out.getPos()-segmentStart);
     }
     
+    @SuppressWarnings("unchecked")
     public void collect(WritableComparable key,
                         Writable value) throws IOException {
       
@@ -420,6 +427,7 @@
       }
     }
     
+    @SuppressWarnings("unchecked")
     private void combineAndSpill(RawKeyValueIterator resultIter, 
                                  Reducer combiner, OutputCollector combineCollector) throws IOException {
       //combine the key/value obtained from the offset & indices arrays.

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Mapper.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Mapper.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Mapper.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Mapper.java Thu Aug 16 11:45:49 2007
@@ -28,7 +28,10 @@
  * intermediate values associated with a given output key are subsequently
  * grouped by the map/reduce system, and passed to a {@link Reducer} to
  * determine the final output.. */
-public interface Mapper extends JobConfigurable, Closeable {
+public interface Mapper<K1 extends WritableComparable, V1 extends Writable,
+                        K2 extends WritableComparable, V2 extends Writable>
+  extends JobConfigurable, Closeable {
+  
   /** Maps a single input key/value pair into intermediate key/value pairs.
    * Output pairs need not be of the same types as input pairs.  A given input
    * pair may map to zero or many output pairs.  Output pairs are collected
@@ -39,7 +42,7 @@
    * @param value the values
    * @param output collects mapped keys and values
    */
-  void map(WritableComparable key, Writable value,
-           OutputCollector output, Reporter reporter)
+  void map(K1 key, V1 value,
+           OutputCollector<K2, V2> output, Reporter reporter)
     throws IOException;
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java Thu Aug 16 11:45:49 2007
@@ -22,6 +22,8 @@
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 
 /**
  * An abstract {@link InputFormat} that returns {@link MultiFileSplit}'s
@@ -32,7 +34,9 @@
  * to construct <code>RecordReader</code>'s for <code>MultiFileSplit</code>'s.
  * @see MultiFileSplit
  */
-public abstract class MultiFileInputFormat extends FileInputFormat {
+public abstract class MultiFileInputFormat<K extends WritableComparable,
+                                           V extends Writable>
+  extends FileInputFormat<K, V> {
 
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) 
@@ -87,7 +91,7 @@
     return lengths.length - startIndex;
   }
   
-  public abstract RecordReader getRecordReader(InputSplit split,
+  public abstract RecordReader<K, V> getRecordReader(InputSplit split,
       JobConf job, Reporter reporter)
       throws IOException;
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputCollector.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputCollector.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputCollector.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputCollector.java Thu Aug 16 11:45:49 2007
@@ -26,11 +26,13 @@
 
 /** Passed to {@link Mapper} and {@link Reducer} implementations to collect
  * output data. */
-public interface OutputCollector {
+public interface OutputCollector<K extends WritableComparable,
+                                 V extends Writable> {
+  
   /** Adds a key/value pair to the output.
    *
    * @param key the key to add
    * @param value to value to add
    */
-  void collect(WritableComparable key, Writable value) throws IOException;
+  void collect(K key, V value) throws IOException;
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java Thu Aug 16 11:45:49 2007
@@ -21,11 +21,14 @@
 import java.io.IOException;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.Progressable;
 
 /** An output data format.  Output files are stored in a {@link
  * FileSystem}. */
-public interface OutputFormat {
+public interface OutputFormat<K extends WritableComparable,
+                              V extends Writable> {
 
   /** Construct a {@link RecordWriter} with Progressable.
    *
@@ -34,8 +37,8 @@
    * @param progress mechanism for reporting progress while writing to file
    * @return a {@link RecordWriter}
    */
-  RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name,
-                               Progressable progress)
+  RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,
+                                     String name, Progressable progress)
     throws IOException;
 
   /** Check whether the output specification for a job is appropriate.  Called

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java Thu Aug 16 11:45:49 2007
@@ -22,11 +22,15 @@
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.util.Progressable;
 
 /** A base class for {@link OutputFormat}. */
-public abstract class OutputFormatBase implements OutputFormat {
+public abstract class OutputFormatBase<K extends WritableComparable,
+                                       V extends Writable>
+  implements OutputFormat<K, V> {
 
   /**
    * Set whether the output of the reduce is compressed
@@ -79,7 +83,7 @@
     }
   }
   
-  public abstract RecordWriter getRecordWriter(FileSystem ignored,
+  public abstract RecordWriter<K, V> getRecordWriter(FileSystem ignored,
                                                JobConf job, String name,
                                                Progressable progress)
     throws IOException;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Partitioner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Partitioner.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Partitioner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Partitioner.java Thu Aug 16 11:45:49 2007
@@ -22,7 +22,10 @@
 import org.apache.hadoop.io.WritableComparable;
 
 /** Partitions the key space.  A partition is created for each reduce task. */
-public interface Partitioner extends JobConfigurable {
+public interface Partitioner<K2 extends WritableComparable,
+                             V2 extends Writable>
+  extends JobConfigurable {
+  
   /** Returns the paritition number for a given entry given the total number of
    * partitions.  Typically a hash function on a all or a subset of the key.
    *
@@ -31,5 +34,5 @@
    * @param numPartitions the number of partitions
    * @return the partition number
    */
-  int getPartition(WritableComparable key, Writable value, int numPartitions);
+  int getPartition(K2 key, V2 value, int numPartitions);
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordReader.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordReader.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordReader.java Thu Aug 16 11:45:49 2007
@@ -26,7 +26,8 @@
 
 /** Reads key/value pairs from an input file {@link FileSplit}.
  * Implemented by {@link InputFormat} implementations. */
-public interface RecordReader {
+public interface RecordReader<K extends WritableComparable,
+                              V extends Writable> {
   /** Reads the next key/value pair.
    *
    * @param key the key to read data into
@@ -35,19 +36,19 @@
    *
    * @see Writable#readFields(DataInput)
    */      
-  boolean next(Writable key, Writable value) throws IOException;
+  boolean next(K key, V value) throws IOException;
   
   /**
    * Create an object of the appropriate type to be used as a key.
    * @return a new key object
    */
-  WritableComparable createKey();
+  K createKey();
   
   /**
    * Create an object of the appropriate type to be used as the value.
    * @return a new value object
    */
-  Writable createValue();
+  V createValue();
 
   /** Returns the current position in the input. */
   long getPos() throws IOException;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordWriter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordWriter.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordWriter.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordWriter.java Thu Aug 16 11:45:49 2007
@@ -26,7 +26,8 @@
 
 /** Writes key/value pairs to an output file.  Implemented by {@link
  * OutputFormat} implementations. */
-public interface RecordWriter {
+public interface RecordWriter<K extends WritableComparable,
+                              V extends Writable> {
   /** Writes a key/value pair.
    *
    * @param key the key to write
@@ -34,7 +35,7 @@
    *
    * @see Writable#write(DataOutput)
    */      
-  void write(WritableComparable key, Writable value) throws IOException;
+  void write(K key, V value) throws IOException;
 
   /** Close this to future operations.*/ 
   void close(Reporter reporter) throws IOException;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Thu Aug 16 11:45:49 2007
@@ -235,6 +235,7 @@
     }
   }
 
+  @SuppressWarnings("unchecked")
   public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException {
     Reducer reducer = (Reducer)ReflectionUtils.newInstance(
@@ -299,6 +300,7 @@
       job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);  
     
     OutputCollector collector = new OutputCollector() {
+        @SuppressWarnings("unchecked")
         public void collect(WritableComparable key, Writable value)
           throws IOException {
           out.write(key, value);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reducer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reducer.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reducer.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reducer.java Thu Aug 16 11:45:49 2007
@@ -28,7 +28,10 @@
 
 /** Reduces a set of intermediate values which share a key to a smaller set of
  * values.  Input values are the grouped output of a {@link Mapper}. */
-public interface Reducer extends JobConfigurable, Closeable {
+public interface Reducer<K2 extends WritableComparable, V2 extends Writable,
+                         K3 extends WritableComparable, V3 extends Writable>
+    extends JobConfigurable, Closeable {
+  
   /** Combines values for a given key.  Output values must be of the same type
    * as input values.  Input keys must not be altered.  Typically all values
    * are combined into zero or one value.  Output pairs are collected with
@@ -38,8 +41,8 @@
    * @param values the values to combine
    * @param output to collect combined values
    */
-  void reduce(WritableComparable key, Iterator values,
-              OutputCollector output, Reporter reporter)
+  void reduce(K2 key, Iterator<V2> values,
+              OutputCollector<K3, V3> output, Reporter reporter)
     throws IOException;
 
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java Thu Aug 16 11:45:49 2007
@@ -20,18 +20,23 @@
 
 import java.io.IOException;
 
+import org.apache.hadoop.io.Text;
+
 /**
  * This class is similar to SequenceFileInputFormat, except it generates SequenceFileAsTextRecordReader 
  * which converts the input keys and values to their String forms by calling toString() method. 
  */
-public class SequenceFileAsTextInputFormat extends SequenceFileInputFormat {
+public class SequenceFileAsTextInputFormat
+  extends SequenceFileInputFormat<Text, Text> {
 
   public SequenceFileAsTextInputFormat() {
     super();
   }
 
-  public RecordReader getRecordReader(InputSplit split, JobConf job,
-                                      Reporter reporter) throws IOException {
+  public RecordReader<Text, Text> getRecordReader(InputSplit split,
+                                                  JobConf job,
+                                                  Reporter reporter)
+    throws IOException {
 
     reporter.setStatus(split.toString());
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java Thu Aug 16 11:45:49 2007
@@ -30,34 +30,53 @@
  * method. This class to SequenceFileAsTextInputFormat class is as LineRecordReader
  * class to TextInputFormat class.
  */
-public class SequenceFileAsTextRecordReader extends SequenceFileRecordReader {
+public class SequenceFileAsTextRecordReader
+  implements RecordReader<Text, Text> {
+  
+  private final SequenceFileRecordReader<WritableComparable, Writable>
+  sequenceFileRecordReader;
 
-  private Writable innerKey = super.createKey();
-  private Writable innerValue = super.createValue();
+  private WritableComparable innerKey;
+  private Writable innerValue;
 
   public SequenceFileAsTextRecordReader(Configuration conf, FileSplit split)
     throws IOException {
-    super(conf, split);
+    sequenceFileRecordReader =
+      new SequenceFileRecordReader<WritableComparable, Writable>(conf, split);
+    innerKey = sequenceFileRecordReader.createKey();
+    innerValue = sequenceFileRecordReader.createValue();
   }
 
-  public WritableComparable createKey() {
+  public Text createKey() {
     return new Text();
   }
   
-  public Writable createValue() {
+  public Text createValue() {
     return new Text();
   }
 
   /** Read key/value pair in a line. */
-  public synchronized boolean next(Writable key, Writable value)
-    throws IOException {
-    Text tKey = (Text) key;
-    Text tValue = (Text) value;
-    if (!super.next(innerKey, innerValue)) {
+  public synchronized boolean next(Text key, Text value) throws IOException {
+    Text tKey = key;
+    Text tValue = value;
+    if (!sequenceFileRecordReader.next(innerKey, innerValue)) {
       return false;
     }
     tKey.set(innerKey.toString());
     tValue.set(innerValue.toString());
     return true;
   }
+  
+  public float getProgress() throws IOException {
+    return sequenceFileRecordReader.getProgress();
+  }
+  
+  public synchronized long getPos() throws IOException {
+    return sequenceFileRecordReader.getPos();
+  }
+  
+  public synchronized void close() throws IOException {
+    sequenceFileRecordReader.close();
+  }
+  
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java Thu Aug 16 11:45:49 2007
@@ -33,6 +33,7 @@
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /**
@@ -41,7 +42,10 @@
  * 
  */
 
-public class SequenceFileInputFilter extends SequenceFileInputFormat {
+public class SequenceFileInputFilter<K extends WritableComparable,
+                                     V extends Writable>
+  extends SequenceFileInputFormat<K, V> {
+  
   final private static String FILTER_CLASS = "sequencefile.filter.class";
   final private static String FILTER_FREQUENCY
     = "sequencefile.filter.frequency";
@@ -56,13 +60,13 @@
    * @param reporter reporter who sends report to task tracker
    * @return RecordReader
    */
-  public RecordReader getRecordReader(InputSplit split,
+  public RecordReader<K, V> getRecordReader(InputSplit split,
                                       JobConf job, Reporter reporter)
     throws IOException {
         
     reporter.setStatus(split.toString());
         
-    return new FilterRecordReader(job, (FileSplit) split);
+    return new FilterRecordReader<K, V>(job, (FileSplit) split);
   }
 
 
@@ -278,7 +282,10 @@
     }
   }
     
-  private static class FilterRecordReader extends SequenceFileRecordReader {
+  private static class FilterRecordReader<K extends WritableComparable,
+                                          V extends Writable>
+    extends SequenceFileRecordReader<K, V> {
+    
     private Filter filter;
         
     public FilterRecordReader(Configuration conf, FileSplit split)
@@ -290,8 +297,7 @@
                                                    conf);
     }
         
-    public synchronized boolean next(Writable key, Writable value)
-      throws IOException {
+    public synchronized boolean next(K key, V value) throws IOException {
       while (next(key)) {
         if (filter.accept(key)) {
           getCurrentValue(value);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFormat.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFormat.java Thu Aug 16 11:45:49 2007
@@ -24,9 +24,13 @@
 
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 
 /** An {@link InputFormat} for {@link SequenceFile}s. */
-public class SequenceFileInputFormat extends FileInputFormat {
+public class SequenceFileInputFormat<K extends WritableComparable,
+                                     V extends Writable>
+  extends FileInputFormat<K, V> {
 
   public SequenceFileInputFormat() {
     setMinSplitSize(SequenceFile.SYNC_INTERVAL);
@@ -45,13 +49,13 @@
     return files;
   }
 
-  public RecordReader getRecordReader(InputSplit split,
+  public RecordReader<K, V> getRecordReader(InputSplit split,
                                       JobConf job, Reporter reporter)
     throws IOException {
 
     reporter.setStatus(split.toString());
 
-    return new SequenceFileRecordReader(job, (FileSplit) split);
+    return new SequenceFileRecordReader<K, V>(job, (FileSplit) split);
   }
 
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java Thu Aug 16 11:45:49 2007
@@ -28,7 +28,10 @@
 import org.apache.hadoop.util.ReflectionUtils;
 
 /** An {@link RecordReader} for {@link SequenceFile}s. */
-public class SequenceFileRecordReader implements RecordReader {
+public class SequenceFileRecordReader<K extends WritableComparable,
+                                      V extends Writable>
+  implements RecordReader<K, V> {
+  
   private SequenceFile.Reader in;
   private long start;
   private long end;
@@ -52,24 +55,25 @@
 
 
   /** The class of key that must be passed to {@link
-   * #next(Writable,Writable)}.. */
+   * #next(WritableComparable,Writable)}.. */
   public Class getKeyClass() { return in.getKeyClass(); }
 
   /** The class of value that must be passed to {@link
-   * #next(Writable,Writable)}.. */
+   * #next(WritableComparable,Writable)}.. */
   public Class getValueClass() { return in.getValueClass(); }
   
-  public WritableComparable createKey() {
-    return (WritableComparable) ReflectionUtils.newInstance(getKeyClass(), 
+  @SuppressWarnings("unchecked")
+  public K createKey() {
+    return (K) ReflectionUtils.newInstance(getKeyClass(), 
                                                             conf);
   }
   
-  public Writable createValue() {
-    return (Writable) ReflectionUtils.newInstance(getValueClass(), conf);
+  @SuppressWarnings("unchecked")
+  public V createValue() {
+    return (V) ReflectionUtils.newInstance(getValueClass(), conf);
   }
     
-  public synchronized boolean next(Writable key, Writable value)
-    throws IOException {
+  public synchronized boolean next(K key, V value) throws IOException {
     if (!more) return false;
     long pos = in.getPosition();
     boolean eof = in.next(key, value);
@@ -81,7 +85,7 @@
     return more;
   }
   
-  protected synchronized boolean next(Writable key)
+  protected synchronized boolean next(K key)
     throws IOException {
     if (!more) return false;
     long pos = in.getPosition();
@@ -94,7 +98,7 @@
     return more;
   }
   
-  protected synchronized void getCurrentValue(Writable value)
+  protected synchronized void getCurrentValue(V value)
     throws IOException {
     in.getCurrentValue(value);
   }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java Thu Aug 16 11:45:49 2007
@@ -21,12 +21,15 @@
 import java.io.*;
 
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.*;
 
 /** An {@link InputFormat} for plain text files.  Files are broken into lines.
  * Either linefeed or carriage-return are used to signal end of line.  Keys are
  * the position in the file, and values are the line of text.. */
-public class TextInputFormat extends FileInputFormat implements JobConfigurable {
+public class TextInputFormat extends FileInputFormat<LongWritable, Text>
+  implements JobConfigurable {
 
   private CompressionCodecFactory compressionCodecs = null;
   
@@ -38,8 +41,11 @@
     return compressionCodecs.getCodec(file) == null;
   }
 
-  public RecordReader getRecordReader(InputSplit genericSplit, JobConf job,
-                                      Reporter reporter) throws IOException {
+  public RecordReader<LongWritable, Text> getRecordReader(
+                                          InputSplit genericSplit, JobConf job,
+                                          Reporter reporter)
+    throws IOException {
+    
     reporter.setStatus(genericSplit.toString());
     return new LineRecordReader(job, (FileSplit) genericSplit);
   }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java Thu Aug 16 11:45:49 2007
@@ -32,16 +32,21 @@
 import org.apache.hadoop.util.*;
 
 /** An {@link OutputFormat} that writes plain text files. */
-public class TextOutputFormat extends OutputFormatBase {
+public class TextOutputFormat<K extends WritableComparable,
+                              V extends Writable>
+  extends OutputFormatBase<K, V> {
 
-  protected static class LineRecordWriter implements RecordWriter {
+  protected static class LineRecordWriter<K extends WritableComparable,
+                                          V extends Writable>
+    implements RecordWriter<K, V> {
+    
     private DataOutputStream out;
     
     public LineRecordWriter(DataOutputStream out) {
       this.out = out;
     }
     
-    public synchronized void write(WritableComparable key, Writable value)
+    public synchronized void write(K key, V value)
       throws IOException {
 
       if (key == null && value == null) {
@@ -64,8 +69,10 @@
     }
   }
   
-  public RecordWriter getRecordWriter(FileSystem ignored, JobConf job,
-                                      String name, Progressable progress)
+  public RecordWriter<K, V> getRecordWriter(FileSystem ignored,
+                                                  JobConf job,
+                                                  String name,
+                                                  Progressable progress)
     throws IOException {
 
     Path dir = job.getOutputPath();
@@ -73,7 +80,7 @@
     boolean isCompressed = getCompressOutput(job);
     if (!isCompressed) {
       FSDataOutputStream fileOut = fs.create(new Path(dir, name), progress);
-      return new LineRecordWriter(fileOut);
+      return new LineRecordWriter<K, V>(fileOut);
     } else {
       Class codecClass = getOutputCompressorClass(job, GzipCodec.class);
       // create the named codec
@@ -82,8 +89,8 @@
       // build the filename including the extension
       Path filename = new Path(dir, name + codec.getDefaultExtension());
       FSDataOutputStream fileOut = fs.create(filename, progress);
-      return new LineRecordWriter(new DataOutputStream
-                                  (codec.createOutputStream(fileOut)));
+      return new LineRecordWriter<K, V>(new DataOutputStream
+                                        (codec.createOutputStream(fileOut)));
     }
   }      
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/FieldSelectionMapReduce.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/FieldSelectionMapReduce.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/FieldSelectionMapReduce.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/FieldSelectionMapReduce.java Thu Aug 16 11:45:49 2007
@@ -63,7 +63,9 @@
  * the key is never ignored.
  * 
  */
-public class FieldSelectionMapReduce implements Mapper, Reducer {
+public class FieldSelectionMapReduce<K extends WritableComparable,
+                                     V extends Writable>
+    implements Mapper<K, V, Text, Text>, Reducer<Text, Text, Text, Text> {
 
   private String mapOutputKeyValueSpec;
 
@@ -133,8 +135,8 @@
   /**
    * The identify function. Input key/value pair is written directly to output.
    */
-  public void map(WritableComparable key, Writable val, OutputCollector output,
-      Reporter reporter) throws IOException {
+  public void map(K key, V val,
+                  OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
     String valStr = val.toString();
     String[] inputValFields = valStr.split(this.fieldSeparator);
     String[] inputKeyFields = null;
@@ -180,7 +182,8 @@
    * @param fieldList an array of field numbers extracted from the specs.
    * @return number n if some field spec is in the form of "n-", -1 otherwise.
    */
-  private int extractFields(String[] fieldListSpec, ArrayList<Integer> fieldList) {
+  private int extractFields(String[] fieldListSpec,
+                            ArrayList<Integer> fieldList) {
     int allFieldsFrom = -1;
     int i = 0;
     int j = 0;
@@ -310,8 +313,9 @@
     return retv;
   }
 
-  public void reduce(WritableComparable key, Iterator values,
-      OutputCollector output, Reporter reporter) throws IOException {
+  public void reduce(Text key, Iterator<Text> values,
+                     OutputCollector<Text, Text> output, Reporter reporter)
+    throws IOException {
 
     String keyStr = key.toString() + this.fieldSeparator;
     while (values.hasNext()) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/HashPartitioner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/HashPartitioner.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/HashPartitioner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/HashPartitioner.java Thu Aug 16 11:45:49 2007
@@ -25,12 +25,14 @@
 import org.apache.hadoop.io.Writable;
 
 /** Partition keys by their {@link Object#hashCode()}. */
-public class HashPartitioner implements Partitioner {
+public class HashPartitioner<K2 extends WritableComparable,
+                             V2 extends Writable>
+    implements Partitioner<K2, V2> {
 
   public void configure(JobConf job) {}
 
   /** Use {@link Object#hashCode()} to partition. */
-  public int getPartition(WritableComparable key, Writable value,
+  public int getPartition(K2 key, V2 value,
                           int numReduceTasks) {
     return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
   }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/IdentityMapper.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/IdentityMapper.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/IdentityMapper.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/IdentityMapper.java Thu Aug 16 11:45:49 2007
@@ -29,12 +29,13 @@
 import org.apache.hadoop.io.WritableComparable;
 
 /** Implements the identity function, mapping inputs directly to outputs. */
-public class IdentityMapper extends MapReduceBase implements Mapper {
+public class IdentityMapper<K extends WritableComparable, V extends Writable>
+    extends MapReduceBase implements Mapper<K, V, K, V> {
 
   /** The identify function.  Input key/value pair is written directly to
    * output.*/
-  public void map(WritableComparable key, Writable val,
-                  OutputCollector output, Reporter reporter)
+  public void map(K key, V val,
+                  OutputCollector<K, V> output, Reporter reporter)
     throws IOException {
     output.collect(key, val);
   }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/IdentityReducer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/IdentityReducer.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/IdentityReducer.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/IdentityReducer.java Thu Aug 16 11:45:49 2007
@@ -31,14 +31,15 @@
 import org.apache.hadoop.io.WritableComparable;
 
 /** Performs no reduction, writing all input values directly to the output. */
-public class IdentityReducer extends MapReduceBase implements Reducer {
+public class IdentityReducer<K extends WritableComparable, V extends Writable>
+    extends MapReduceBase implements Reducer<K, V, K, V> {
 
   /** Writes all keys and values directly to output. */
-  public void reduce(WritableComparable key, Iterator values,
-                     OutputCollector output, Reporter reporter)
+  public void reduce(K key, Iterator<V> values,
+                     OutputCollector<K, V> output, Reporter reporter)
     throws IOException {
     while (values.hasNext()) {
-      output.collect(key, (Writable)values.next());
+      output.collect(key, values.next());
     }
   }
 	

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/InverseMapper.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/InverseMapper.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/InverseMapper.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/InverseMapper.java Thu Aug 16 11:45:49 2007
@@ -20,23 +20,23 @@
 
 import java.io.IOException;
 
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.MapReduceBase;
-
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.Writable;
 
 
 /** A {@link Mapper} that swaps keys and values. */
-public class InverseMapper extends MapReduceBase implements Mapper {
+public class InverseMapper<K extends WritableComparable,
+                           V extends WritableComparable>
+    extends MapReduceBase implements Mapper<K, V, V, K> {
 
   /** The inverse function.  Input keys and values are swapped.*/
-  public void map(WritableComparable key, Writable value,
-                  OutputCollector output, Reporter reporter)
+  public void map(K key, V value,
+                  OutputCollector<V, K> output, Reporter reporter)
     throws IOException {
-    output.collect((WritableComparable)value, key);
+    output.collect(value, key);
   }
   
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/KeyFieldBasedPartitioner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/KeyFieldBasedPartitioner.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/KeyFieldBasedPartitioner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/KeyFieldBasedPartitioner.java Thu Aug 16 11:45:49 2007
@@ -23,7 +23,9 @@
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Partitioner;
 
-public class KeyFieldBasedPartitioner implements Partitioner {
+public class KeyFieldBasedPartitioner<K2 extends WritableComparable,
+                                      V2 extends Writable>
+    implements Partitioner<K2, V2> {
 
   private int numOfPartitionFields;
 
@@ -35,7 +37,7 @@
   }
 
   /** Use {@link Object#hashCode()} to partition. */
-  public int getPartition(WritableComparable key, Writable value,
+  public int getPartition(K2 key, V2 value,
       int numReduceTasks) {
     String partitionKeyStr = key.toString();
     String[] fields = partitionKeyStr.split(this.keyFieldSeparator);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/LongSumReducer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/LongSumReducer.java?view=diff&rev=566798&r1=566797&r2=566798
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/LongSumReducer.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/LongSumReducer.java Thu Aug 16 11:45:49 2007
@@ -30,16 +30,19 @@
 import org.apache.hadoop.io.LongWritable;
 
 /** A {@link Reducer} that sums long values. */
-public class LongSumReducer extends MapReduceBase implements Reducer {
+public class LongSumReducer<K extends WritableComparable>
+    extends MapReduceBase
+    implements Reducer<K, LongWritable, K, LongWritable> {
 
-  public void reduce(WritableComparable key, Iterator values,
-                     OutputCollector output, Reporter reporter)
+  public void reduce(K key, Iterator<LongWritable> values,
+                     OutputCollector<K, LongWritable> output,
+                     Reporter reporter)
     throws IOException {
 
     // sum all values for this key
     long sum = 0;
     while (values.hasNext()) {
-      sum += ((LongWritable)values.next()).get();
+      sum += values.next().get();
     }
 
     // output sum



Mime
View raw message