hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r899844 [1/2] - in /hadoop/mapreduce/trunk: ./ src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/ src/examples/org/apache/hadoop/examples/ src/examples/org/apache/hadoop/examples/pi/ src/java/org/apache/hadoop/mapred/ src/...
Date Fri, 15 Jan 2010 23:48:55 GMT
Author: omalley
Date: Fri Jan 15 23:48:54 2010
New Revision: 899844

URL: http://svn.apache.org/viewvc?rev=899844&view=rev
Log:
HADOOP-1126. Reverting due to -1.

Removed:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestAvroSerialization.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/avro/
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/build.xml
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContext.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/BaileyBorweinPlouffe.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/DBCountPageView.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/SecondarySort.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IFile.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Merger.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/ReduceContextImpl.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=899844&r1=899843&r2=899844&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Jan 15 23:48:54 2010
@@ -103,10 +103,6 @@
     MAPREDUCE-1295. Add a tool in Rumen for folding and manipulating job
     traces. (Dick King via cdouglas)
 
-    MAPREDUCE-1126. Shuffle now uses Serialization API for
-    comparisons, permitting non-Writable intermediate data.
-    (Aaron Kimball via cutting)
-
     MAPREDUCE-1302. TrackerDistributedCacheManager deletes file
     asynchronously, thus reducing task initialization delays.
     (Zheng Shao via dhruba)

Modified: hadoop/mapreduce/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/build.xml?rev=899844&r1=899843&r2=899844&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/build.xml (original)
+++ hadoop/mapreduce/trunk/build.xml Fri Jan 15 23:48:54 2010
@@ -322,7 +322,7 @@
     </taskdef>
     <protocol destdir="${build.src}">
       <fileset dir="${mapred.src.dir}">
-        <include name="**/*.avpr" />
+	<include name="**/*.avpr" />
       </fileset>
     </protocol>
   </target>
@@ -472,20 +472,7 @@
   <!-- Compile test code                                                  --> 
   <!-- ================================================================== -->
 
-  <target name="avro-generate-test">
-    <!-- generate avro record instances for tests -->
-    <taskdef name="schema" classname="org.apache.avro.specific.SchemaTask">
-      <classpath refid="classpath" />
-    </taskdef>
-    <schema destdir="${test.generated.dir}">
-      <fileset dir="${test.src.dir}">
-        <include name="**/*.avsc" />
-      </fileset>
-    </schema>
-  </target>
-
-  <target name="compile-mapred-test" depends="compile-mapred-classes, compile-examples,
-      ivy-retrieve-test, avro-generate-test">
+  <target name="compile-mapred-test" depends="compile-mapred-classes, compile-examples, ivy-retrieve-test">
 
     <mkdir dir="${test.mapred.build.classes}"/>
     <mkdir dir="${test.mapred.build.testjar}"/>
@@ -493,7 +480,7 @@
 
     <javac 
       encoding="${build.encoding}" 
-      srcdir="${test.src.dir}/mapred;${test.src.dir}/unit;${test.generated.dir}"
+      srcdir="${test.src.dir}/mapred;${test.src.dir}/unit"
       includes="org/apache/hadoop/**/*.java"
       destdir="${test.mapred.build.classes}"
       debug="${javac.debug}"

Modified: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContext.java?rev=899844&r1=899843&r2=899844&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContext.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContext.java Fri Jan 15 23:48:54 2010
@@ -20,11 +20,9 @@
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.serializer.SerializationBase;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -44,13 +42,6 @@
 
   private MockOutputCollector<KEYOUT, VALUEOUT> output;
 
-  /** Return a metadata map that will instantiate serializers
-   * for Writable classes.
-   */
-  private static Map<String, String> getWritableSerializationMap() {
-    return SerializationBase.getMetadataFromClass(Text.class);
-  }
-
   public MockReduceContext(final List<Pair<KEYIN, List<VALUEIN>>> in, 
                            final Counters counters) 
   throws IOException, InterruptedException {
@@ -58,7 +49,7 @@
           new TaskAttemptID("mrunit-jt", 0, TaskType.REDUCE, 0, 0),
           new MockRawKeyValueIterator(), null, null, null,
           new MockOutputCommitter(), new MockReporter(counters), null,
-          getWritableSerializationMap(), getWritableSerializationMap());
+          (Class) Text.class, (Class) Text.class);
     this.inputIter = in.iterator();
     this.output = new MockOutputCollector<KEYOUT, VALUEOUT>();
   }

Modified: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/BaileyBorweinPlouffe.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/BaileyBorweinPlouffe.java?rev=899844&r1=899843&r2=899844&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/BaileyBorweinPlouffe.java (original)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/BaileyBorweinPlouffe.java Fri Jan 15 23:48:54 2010
@@ -45,7 +45,6 @@
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.jobdata.WritableJobData;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -316,10 +315,8 @@
 
     // setup mapper
     job.setMapperClass(BbpMapper.class);
-    WritableJobData.setMapOutputKeyClass(job.getConfiguration(),
-        LongWritable.class);
-    WritableJobData.setMapOutputValueClass(job.getConfiguration(),
-        BytesWritable.class);
+    job.setMapOutputKeyClass(LongWritable.class);
+    job.setMapOutputValueClass(BytesWritable.class);
 
     // setup reducer
     job.setReducerClass(BbpReducer.class);

Modified: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/DBCountPageView.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/DBCountPageView.java?rev=899844&r1=899843&r2=899844&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/DBCountPageView.java (original)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/DBCountPageView.java Fri Jan 15 23:48:54 2010
@@ -45,7 +45,6 @@
 import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
 import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
-import org.apache.hadoop.mapreduce.lib.jobdata.WritableJobData;
 import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
@@ -400,10 +399,8 @@
 
     DBOutputFormat.setOutput(job, "Pageview", PageviewFieldNames);
     
-    WritableJobData.setMapOutputKeyClass(job.getConfiguration(),
-        Text.class);
-    WritableJobData.setMapOutputValueClass(job.getConfiguration(),
-        LongWritable.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(LongWritable.class);
 
     job.setOutputKeyClass(PageviewRecord.class);
     job.setOutputValueClass(NullWritable.class);

Modified: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/SecondarySort.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/SecondarySort.java?rev=899844&r1=899843&r2=899844&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/SecondarySort.java (original)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/SecondarySort.java Fri Jan 15 23:48:54 2010
@@ -32,7 +32,6 @@
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.jobdata.WritableJobData;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -225,10 +224,8 @@
     job.setGroupingComparatorClass(FirstGroupingComparator.class);
 
     // the map output is IntPair, IntWritable
-    WritableJobData.setMapOutputKeyClass(job.getConfiguration(),
-        IntPair.class);
-    WritableJobData.setMapOutputValueClass(job.getConfiguration(),
-        IntWritable.class);
+    job.setMapOutputKeyClass(IntPair.class);
+    job.setMapOutputValueClass(IntWritable.class);
 
     // the reduce output is Text, IntWritable
     job.setOutputKeyClass(Text.class);

Modified: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java?rev=899844&r1=899843&r2=899844&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java (original)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java Fri Jan 15 23:48:54 2010
@@ -51,7 +51,6 @@
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.apache.hadoop.mapreduce.lib.jobdata.WritableJobData;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -248,10 +247,8 @@
     public void init(Job job) {
       // setup mapper
       job.setMapperClass(SummingMapper.class);
-      WritableJobData.setMapOutputKeyClass(job.getConfiguration(),
-          NullWritable.class);
-      WritableJobData.setMapOutputValueClass(job.getConfiguration(),
-          TaskResult.class);
+      job.setMapOutputKeyClass(NullWritable.class);
+      job.setMapOutputValueClass(TaskResult.class);
 
       // zero reducer
       job.setNumReduceTasks(0);
@@ -303,10 +300,8 @@
     public void init(Job job) {
       // setup mapper
       job.setMapperClass(PartitionMapper.class);
-      WritableJobData.setMapOutputKeyClass(job.getConfiguration(),
-          IntWritable.class);
-      WritableJobData.setMapOutputValueClass(job.getConfiguration(),
-          SummationWritable.class);
+      job.setMapOutputKeyClass(IntWritable.class);
+      job.setMapOutputValueClass(SummationWritable.class);
 
       // setup partitioner
       job.setPartitionerClass(IndexPartitioner.class);
@@ -608,4 +603,4 @@
   public static void main(String[] args) throws Exception {
     System.exit(ToolRunner.run(null, new DistSum(), args));
   }
-}
+}
\ No newline at end of file

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IFile.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IFile.java?rev=899844&r1=899843&r2=899844&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IFile.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IFile.java Fri Jan 15 23:48:54 2010
@@ -37,7 +37,8 @@
 import org.apache.hadoop.io.compress.CompressionOutputStream;
 import org.apache.hadoop.io.compress.Compressor;
 import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.serializer.SerializerBase;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
 
 /**
  * <code>IFile</code> is the simple <key-len, value-len, key, value> format
@@ -77,16 +78,18 @@
 
     IFileOutputStream checksumOut;
 
-    SerializerBase<K> keySerializer;
-    SerializerBase<V> valueSerializer;
+    Class<K> keyClass;
+    Class<V> valueClass;
+    Serializer<K> keySerializer;
+    Serializer<V> valueSerializer;
     
     DataOutputBuffer buffer = new DataOutputBuffer();
 
     public Writer(Configuration conf, FileSystem fs, Path file, 
-                  boolean createSerializers,
+                  Class<K> keyClass, Class<V> valueClass,
                   CompressionCodec codec,
                   Counters.Counter writesCounter) throws IOException {
-      this(conf, fs.create(file), createSerializers, codec,
+      this(conf, fs.create(file), keyClass, valueClass, codec,
            writesCounter);
       ownOutputStream = true;
     }
@@ -96,7 +99,7 @@
     }
 
     public Writer(Configuration conf, FSDataOutputStream out, 
-        boolean createSerializers,
+        Class<K> keyClass, Class<V> valueClass,
         CompressionCodec codec, Counters.Counter writesCounter)
         throws IOException {
       this.writtenRecordsCounter = writesCounter;
@@ -114,27 +117,30 @@
         this.out = new FSDataOutputStream(checksumOut,null);
       }
       
+      this.keyClass = keyClass;
+      this.valueClass = valueClass;
 
-      if (createSerializers) {
-        JobConf job = new JobConf(conf);
-        this.keySerializer = job.getMapOutputKeySerializer();
-        this.valueSerializer = job.getMapOutputValueSerializer();
+      if (keyClass != null) {
+        SerializationFactory serializationFactory = 
+          new SerializationFactory(conf);
+        this.keySerializer = serializationFactory.getSerializer(keyClass);
         this.keySerializer.open(buffer);
+        this.valueSerializer = serializationFactory.getSerializer(valueClass);
         this.valueSerializer.open(buffer);
       }
     }
 
     public Writer(Configuration conf, FileSystem fs, Path file) 
     throws IOException {
-      this(conf, fs, file, false, null, null);
+      this(conf, fs, file, null, null, null, null);
     }
 
     public void close() throws IOException {
 
       // When IFile writer is created by BackupStore, we do not have
-      // serializers created. So, check before closing the
+      // Key and Value classes set. So, check before closing the
       // serializers
-      if (null != keySerializer) {
+      if (keyClass != null) {
         keySerializer.close();
         valueSerializer.close();
       }
@@ -177,6 +183,12 @@
     }
 
     public void append(K key, V value) throws IOException {
+      if (key.getClass() != keyClass)
+        throw new IOException("wrong key class: "+ key.getClass()
+                              +" is not "+ keyClass);
+      if (value.getClass() != valueClass)
+        throw new IOException("wrong value class: "+ value.getClass()
+                              +" is not "+ valueClass);
 
       // Append the 'key'
       keySerializer.serialize(key);

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?rev=899844&r1=899843&r2=899844&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Fri Jan 15 23:48:54 2010
@@ -24,8 +24,6 @@
 import java.net.URL;
 import java.net.URLDecoder;
 import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
@@ -38,11 +36,6 @@
 
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.serializer.DeserializerBase;
-import org.apache.hadoop.io.serializer.SerializationBase;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.SerializerBase;
-import org.apache.hadoop.io.serializer.WritableSerialization;
 
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
@@ -50,7 +43,6 @@
 import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;
 import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
 import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.mapreduce.lib.jobdata.ClassBasedJobData;
 import org.apache.hadoop.mapreduce.util.ConfigUtil;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
@@ -718,111 +710,6 @@
   }
   
   /**
-   * Get the metadata used by the serialization framework to instantiate
-   * (de)serializers for key data emitted by mappers.
-   *
-   * @return the metadata used by the serialization framework for the mapper
-   * output key.
-   */
-  public Map<String, String> getMapOutputKeySerializationMetadata() {
-    Map<String, String> metadata = null;
-    if (getBoolean(JobContext.MAP_OUTPUT_KEY_METADATA_SET, false)) {
-      metadata = getMap(JobContext.MAP_OUTPUT_KEY_METADATA);
-    } else {
-      // Wasn't set via the new metadata map. Maybe it was set by the deprecated
-      // class name?
-      metadata = new HashMap<String, String>();
-      String deprecatedKeyClassName = get(JobContext.MAP_OUTPUT_KEY_CLASS);
-      if (null != deprecatedKeyClassName) {
-        // Yes, it was.
-        metadata.put(WritableSerialization.CLASS_KEY, deprecatedKeyClassName);
-      } else {
-        // Intermediate (k, v) types weren't explicitly set. Build a metadata
-        // map based on the final (k, v) types.
-        metadata.put(WritableSerialization.CLASS_KEY,
-            getOutputKeyClass().getName());
-      }
-    }
-
-    return metadata;
-  }
-
-  /**
-   * Get the metadata used by the serialization framework to instantiate
-   * (de)serializers for value data emitted by mappers.
-   *
-   * @return the metadata used by the serialization framework for the mapper
-   * output value.
-   */
-  public Map<String, String> getMapOutputValueSerializationMetadata() {
-    Map<String, String> metadata = null;
-    if (getBoolean(JobContext.MAP_OUTPUT_VALUE_METADATA_SET, false)) {
-      metadata = getMap(JobContext.MAP_OUTPUT_VALUE_METADATA);
-    } else {
-      // Wasn't set via the new metadata map. Maybe it was set by the deprecated
-      // class name?
-      metadata = new HashMap<String, String>();
-      String deprecatedValClassName = get(JobContext.MAP_OUTPUT_VALUE_CLASS);
-      if (null != deprecatedValClassName) {
-        // Yes, it was.
-        metadata.put(WritableSerialization.CLASS_KEY, deprecatedValClassName);
-      } else {
-        // Intermediate (k, v) types weren't explicitly set. Build a metadata
-        // map based on the final (k, v) types.
-        metadata.put(WritableSerialization.CLASS_KEY,
-            getOutputValueClass().getName());
-      }
-    }
-
-    return metadata;
-  }
-
-  /**
-   * Get the serializer to encode keys from the mapper.
-   *
-   * @return the {@link SerializerBase} for the mapper output keys.
-   */
-  public <T> SerializerBase<T> getMapOutputKeySerializer() {
-    Map<String, String> metadata = getMapOutputKeySerializationMetadata();
-    SerializationFactory factory = new SerializationFactory(this);
-    return factory.getSerializer(metadata);
-  }
-
-  /**
-   * Get the deserializer to decode keys from the mapper.
-   *
-   * @return the {@link DeserializerBase} for the mapper output keys.
-   */
-  public <T> DeserializerBase<T> getMapOutputKeyDeserializer() {
-    Map<String, String> metadata = getMapOutputKeySerializationMetadata();
-    SerializationFactory factory = new SerializationFactory(this);
-    return factory.getDeserializer(metadata);
-  }
-
-  /**
-   * Get the serializer to encode values from the mapper.
-   *
-   * @return the {@link SerializerBase} for the mapper output values.
-   */
-  public <T> SerializerBase<T> getMapOutputValueSerializer() {
-    Map<String, String> metadata = getMapOutputValueSerializationMetadata();
-    SerializationFactory factory = new SerializationFactory(this);
-    return factory.getSerializer(metadata);
-  }
-
-  /**
-   * Get the deserializer to decode values from the mapper.
-   *
-   * @return the {@link DeserializerBase} for the mapper output values.
-   */
-  public <T> DeserializerBase<T> getMapOutputValueDeserializer() {
-    Map<String, String> metadata = getMapOutputValueSerializationMetadata();
-    SerializationFactory factory = new SerializationFactory(this);
-    return factory.getDeserializer(metadata);
-  }
-
-  @Deprecated
-  /**
    * Get the key class for the map output data. If it is not set, use the
    * (final) output key class. This allows the map output key class to be
    * different than the final output key class.
@@ -830,10 +717,13 @@
    * @return the map output key class.
    */
   public Class<?> getMapOutputKeyClass() {
-    return ClassBasedJobData.getMapOutputKeyClass(this);
+    Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class);
+    if (retv == null) {
+      retv = getOutputKeyClass();
+    }
+    return retv;
   }
   
-  @Deprecated
   /**
    * Set the key class for the map output data. This allows the user to
    * specify the map output key class to be different than the final output
@@ -842,10 +732,9 @@
    * @param theClass the map output key class.
    */
   public void setMapOutputKeyClass(Class<?> theClass) {
-    ClassBasedJobData.setMapOutputKeyClass(this, theClass);
+    setClass(JobContext.MAP_OUTPUT_KEY_CLASS, theClass, Object.class);
   }
   
-  @Deprecated
   /**
    * Get the value class for the map output data. If it is not set, use the
    * (final) output value class This allows the map output value class to be
@@ -854,10 +743,14 @@
    * @return the map output value class.
    */
   public Class<?> getMapOutputValueClass() {
-    return ClassBasedJobData.getMapOutputValueClass(this);
+    Class<?> retv = getClass(JobContext.MAP_OUTPUT_VALUE_CLASS, null,
+        Object.class);
+    if (retv == null) {
+      retv = getOutputValueClass();
+    }
+    return retv;
   }
   
-  @Deprecated
   /**
    * Set the value class for the map output data. This allows the user to
    * specify the map output value class to be different than the final output
@@ -866,7 +759,7 @@
    * @param theClass the map output value class.
    */
   public void setMapOutputValueClass(Class<?> theClass) {
-    ClassBasedJobData.setMapOutputValueClass(this, theClass);
+    setClass(JobContext.MAP_OUTPUT_VALUE_CLASS, theClass, Object.class);
   }
   
   /**
@@ -893,16 +786,12 @@
    * 
    * @return the {@link RawComparator} comparator used to compare keys.
    */
-  @SuppressWarnings("unchecked")
   public RawComparator getOutputKeyComparator() {
     Class<? extends RawComparator> theClass = getClass(
       JobContext.KEY_COMPARATOR, null, RawComparator.class);
     if (theClass != null)
       return ReflectionUtils.newInstance(theClass, this);
-    SerializationFactory factory = new SerializationFactory(this);
-    Map<String, String> metadata = getMapOutputKeySerializationMetadata();
-    SerializationBase serialization = factory.getSerialization(metadata);
-    return serialization.getRawComparator(metadata);
+    return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class));
   }
 
   /**

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=899844&r1=899843&r2=899844&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Fri Jan 15 23:48:54 2010
@@ -48,10 +48,9 @@
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.io.serializer.DeserializerBase;
-import org.apache.hadoop.io.serializer.SerializationBase;
+import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.SerializerBase;
+import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapred.Merger.Segment;
 import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
@@ -347,9 +346,8 @@
      throw wrap;
    }
    SerializationFactory factory = new SerializationFactory(conf);
-   DeserializerBase<T> deserializer = 
-       (DeserializerBase<T>) factory.getDeserializer(
-       SerializationBase.getMetadataFromClass(cls));
+   Deserializer<T> deserializer = 
+     (Deserializer<T>) factory.getDeserializer(cls);
    deserializer.open(inFile);
    T split = deserializer.deserialize(null);
    long pos = inFile.getPos();
@@ -616,8 +614,8 @@
       (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
         ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
     // rebuild the input split
-    org.apache.hadoop.mapreduce.InputSplit split =
-        getSplitDetails(new Path(splitIndex.getSplitLocation()),
+    org.apache.hadoop.mapreduce.InputSplit split = null;
+    split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
         splitIndex.getStartOffset());
 
     org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
@@ -712,9 +710,12 @@
     private final int partitions;
     private final JobConf job;
     private final TaskReporter reporter;
+    private final Class<K> keyClass;
+    private final Class<V> valClass;
     private final RawComparator<K> comparator;
-    private final SerializerBase<K> keySerializer;
-    private final SerializerBase<V> valSerializer;
+    private final SerializationFactory serializationFactory;
+    private final Serializer<K> keySerializer;
+    private final Serializer<V> valSerializer;
     private final CombinerRunner<K,V> combinerRunner;
     private final CombineOutputCollector<K, V> combineCollector;
     
@@ -814,9 +815,12 @@
       LOG.info("record buffer = " + softRecordLimit + "/" + kvoffsets.length);
       // k/v serialization
       comparator = job.getOutputKeyComparator();
-      keySerializer = job.getMapOutputKeySerializer();
+      keyClass = (Class<K>)job.getMapOutputKeyClass();
+      valClass = (Class<V>)job.getMapOutputValueClass();
+      serializationFactory = new SerializationFactory(job);
+      keySerializer = serializationFactory.getSerializer(keyClass);
       keySerializer.open(bb);
-      valSerializer = job.getMapOutputValueSerializer();
+      valSerializer = serializationFactory.getSerializer(valClass);
       valSerializer.open(bb);
       // counters
       mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
@@ -863,6 +867,16 @@
     public synchronized void collect(K key, V value, int partition
                                      ) throws IOException {
       reporter.progress();
+      if (key.getClass() != keyClass) {
+        throw new IOException("Type mismatch in key from map: expected "
+                              + keyClass.getName() + ", recieved "
+                              + key.getClass().getName());
+      }
+      if (value.getClass() != valClass) {
+        throw new IOException("Type mismatch in value from map: expected "
+                              + valClass.getName() + ", recieved "
+                              + value.getClass().getName());
+      }
       final int kvnext = (kvindex + 1) % kvoffsets.length;
       if (--recordRemaining <= 0) {
         // Possible for check to remain < zero, if soft limit remains
@@ -1277,7 +1291,8 @@
           IFile.Writer<K, V> writer = null;
           try {
             long segmentStart = out.getPos();
-            writer = new Writer<K, V>(job, out, true, codec, spilledRecordsCounter);
+            writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
+                                      spilledRecordsCounter);
             if (combinerRunner == null) {
               // spill directly
               DataInputBuffer key = new DataInputBuffer();
@@ -1365,8 +1380,8 @@
           try {
             long segmentStart = out.getPos();
             // Create a new codec, don't care!
-            writer = new IFile.Writer<K,V>(job, out, true, codec,
-                spilledRecordsCounter);
+            writer = new IFile.Writer<K,V>(job, out, keyClass, valClass, codec,
+                                            spilledRecordsCounter);
 
             if (i == partition) {
               final long recordStart = out.getPos();
@@ -1525,7 +1540,7 @@
           for (int i = 0; i < partitions; i++) {
             long segmentStart = finalOut.getPos();
             Writer<K, V> writer =
-              new Writer<K, V>(job, finalOut, true, codec, null);
+              new Writer<K, V>(job, finalOut, keyClass, valClass, codec, null);
             writer.close();
             rec.startOffset = segmentStart;
             rec.rawLength = writer.getRawLength();
@@ -1569,15 +1584,17 @@
           //merge
           @SuppressWarnings("unchecked")
           RawKeyValueIterator kvIter = Merger.merge(job, rfs,
-                         codec, segmentList, mergeFactor,
+                         keyClass, valClass, codec,
+                         segmentList, mergeFactor,
                          new Path(mapId.toString()),
                          job.getOutputKeyComparator(), reporter, sortSegments,
                          null, spilledRecordsCounter, sortPhase.phase());
 
           //write merged output to disk
           long segmentStart = finalOut.getPos();
-          Writer<K, V> writer = new Writer<K, V>(job, finalOut, true, codec,
-              spilledRecordsCounter);
+          Writer<K, V> writer =
+              new Writer<K, V>(job, finalOut, keyClass, valClass, codec,
+                               spilledRecordsCounter);
           if (combinerRunner == null || numSpills < minSpillsForCombine) {
             Merger.writeFile(kvIter, writer, reporter, job);
           } else {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Merger.java?rev=899844&r1=899843&r2=899844&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Merger.java Fri Jan 15 23:48:54 2010
@@ -54,6 +54,7 @@
 
   public static <K extends Object, V extends Object>
   RawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                            Class<K> keyClass, Class<V> valueClass, 
                             CompressionCodec codec,
                             Path[] inputs, boolean deleteInputs, 
                             int mergeFactor, Path tmpDir,
@@ -64,7 +65,7 @@
   throws IOException {
     return 
       new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator, 
-                           reporter, null).merge(
+                           reporter, null).merge(keyClass, valueClass,
                                            mergeFactor, tmpDir,
                                            readsCounter, writesCounter, 
                                            mergePhase);
@@ -72,6 +73,7 @@
 
   public static <K extends Object, V extends Object>
   RawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                            Class<K> keyClass, Class<V> valueClass, 
                             CompressionCodec codec,
                             Path[] inputs, boolean deleteInputs, 
                             int mergeFactor, Path tmpDir,
@@ -85,6 +87,7 @@
     return 
       new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator, 
                            reporter, mergedMapOutputsCounter).merge(
+                                           keyClass, valueClass,
                                            mergeFactor, tmpDir,
                                            readsCounter, writesCounter,
                                            mergePhase);
@@ -92,6 +95,7 @@
   
   public static <K extends Object, V extends Object>
   RawKeyValueIterator merge(Configuration conf, FileSystem fs, 
+                            Class<K> keyClass, Class<V> valueClass, 
                             List<Segment<K, V>> segments, 
                             int mergeFactor, Path tmpDir,
                             RawComparator<K> comparator, Progressable reporter,
@@ -99,13 +103,14 @@
                             Counters.Counter writesCounter,
                             Progress mergePhase)
       throws IOException {
-    return merge(conf, fs, segments, mergeFactor, tmpDir,
+    return merge(conf, fs, keyClass, valueClass, segments, mergeFactor, tmpDir,
                  comparator, reporter, false, readsCounter, writesCounter,
                  mergePhase);
   }
 
   public static <K extends Object, V extends Object>
   RawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                            Class<K> keyClass, Class<V> valueClass,
                             List<Segment<K, V>> segments,
                             int mergeFactor, Path tmpDir,
                             RawComparator<K> comparator, Progressable reporter,
@@ -115,7 +120,7 @@
                             Progress mergePhase)
       throws IOException {
     return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
-                           sortSegments).merge(
+                           sortSegments).merge(keyClass, valueClass,
                                                mergeFactor, tmpDir,
                                                readsCounter, writesCounter,
                                                mergePhase);
@@ -123,6 +128,7 @@
 
   public static <K extends Object, V extends Object>
   RawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                            Class<K> keyClass, Class<V> valueClass,
                             CompressionCodec codec,
                             List<Segment<K, V>> segments,
                             int mergeFactor, Path tmpDir,
@@ -133,7 +139,7 @@
                             Progress mergePhase)
       throws IOException {
     return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
-                           sortSegments, codec).merge(
+                           sortSegments, codec).merge(keyClass, valueClass,
                                                mergeFactor, tmpDir,
                                                readsCounter, writesCounter,
                                                mergePhase);
@@ -141,6 +147,7 @@
 
   public static <K extends Object, V extends Object>
     RawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                            Class<K> keyClass, Class<V> valueClass,
                             List<Segment<K, V>> segments,
                             int mergeFactor, int inMemSegments, Path tmpDir,
                             RawComparator<K> comparator, Progressable reporter,
@@ -150,7 +157,7 @@
                             Progress mergePhase)
       throws IOException {
     return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
-                           sortSegments).merge(
+                           sortSegments).merge(keyClass, valueClass,
                                                mergeFactor, inMemSegments,
                                                tmpDir,
                                                readsCounter, writesCounter,
@@ -160,6 +167,7 @@
 
   static <K extends Object, V extends Object>
   RawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                          Class<K> keyClass, Class<V> valueClass,
                           CompressionCodec codec,
                           List<Segment<K, V>> segments,
                           int mergeFactor, int inMemSegments, Path tmpDir,
@@ -170,7 +178,7 @@
                           Progress mergePhase)
     throws IOException {
   return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
-                         sortSegments, codec).merge(
+                         sortSegments, codec).merge(keyClass, valueClass,
                                              mergeFactor, inMemSegments,
                                              tmpDir,
                                              readsCounter, writesCounter,
@@ -516,19 +524,21 @@
       return comparator.compare(key1.getData(), s1, l1, key2.getData(), s2, l2) < 0;
     }
     
-    public RawKeyValueIterator merge(int factor, Path tmpDir,
+    public RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
+                                     int factor, Path tmpDir,
                                      Counters.Counter readsCounter,
                                      Counters.Counter writesCounter,
                                      Progress mergePhase)
         throws IOException {
-      return merge(factor, 0, tmpDir,
+      return merge(keyClass, valueClass, factor, 0, tmpDir,
                    readsCounter, writesCounter, mergePhase);
     }
 
-    RawKeyValueIterator merge(int factor, int inMem, Path tmpDir,
-                              Counters.Counter readsCounter,
-                              Counters.Counter writesCounter,
-                              Progress mergePhase)
+    RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
+                                     int factor, int inMem, Path tmpDir,
+                                     Counters.Counter readsCounter,
+                                     Counters.Counter writesCounter,
+                                     Progress mergePhase)
         throws IOException {
       LOG.info("Merging " + segments.size() + " sorted segments");
 
@@ -657,7 +667,7 @@
                                               approxOutputSize, conf);
 
           Writer<K, V> writer = 
-            new Writer<K, V>(conf, fs, outputFile, true, codec,
+            new Writer<K, V>(conf, fs, outputFile, keyClass, valueClass, codec,
                              writesCounter);
           writeFile(this, writer, reporter, conf);
           writer.close();

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=899844&r1=899843&r2=899844&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Fri Jan 15 23:48:54 2010
@@ -24,7 +24,6 @@
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
-import java.util.Map;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
@@ -43,7 +42,6 @@
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.io.serializer.SerializationBase;
 import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -196,11 +194,11 @@
           extends ValuesIterator<KEY,VALUE> {
     public ReduceValuesIterator (RawKeyValueIterator in,
                                  RawComparator<KEY> comparator, 
-                                 Map<String, String> keyMetadata,
-                                 Map<String, String> valueMetadata,
+                                 Class<KEY> keyClass,
+                                 Class<VALUE> valClass,
                                  Configuration conf, Progressable reporter)
       throws IOException {
-      super(in, comparator, keyMetadata, valueMetadata, conf, reporter);
+      super(in, comparator, keyClass, valClass, conf, reporter);
     }
 
     @Override
@@ -226,19 +224,18 @@
      private Counters.Counter skipGroupCounter;
      private Counters.Counter skipRecCounter;
      private long grpIndex = -1;
-     private Map<String, String> keyMetadata;
-     private Map<String, String> valueMetadata;
+     private Class<KEY> keyClass;
+     private Class<VALUE> valClass;
      private SequenceFile.Writer skipWriter;
      private boolean toWriteSkipRecs;
      private boolean hasNext;
      private TaskReporter reporter;
      
      public SkippingReduceValuesIterator(RawKeyValueIterator in,
-         RawComparator<KEY> comparator,
-         Map<String, String> keyMetadata, Map<String, String> valueMetadata,
-         Configuration conf, TaskReporter reporter,
+         RawComparator<KEY> comparator, Class<KEY> keyClass,
+         Class<VALUE> valClass, Configuration conf, TaskReporter reporter,
          TaskUmbilicalProtocol umbilical) throws IOException {
-       super(in, comparator, keyMetadata, valueMetadata, conf, reporter);
+       super(in, comparator, keyClass, valClass, conf, reporter);
        this.umbilical = umbilical;
        this.skipGroupCounter = 
          reporter.getCounter(TaskCounter.REDUCE_SKIPPED_GROUPS);
@@ -246,8 +243,8 @@
          reporter.getCounter(TaskCounter.REDUCE_SKIPPED_RECORDS);
        this.toWriteSkipRecs = toWriteSkipRecs() &&  
          SkipBadRecords.getSkipOutputPath(conf)!=null;
-       this.keyMetadata = keyMetadata;
-       this.valueMetadata = valueMetadata;
+       this.keyClass = keyClass;
+       this.valClass = valClass;
        this.reporter = reporter;
        skipIt = getSkipRanges().skipRangeIterator();
        mayBeSkip();
@@ -293,39 +290,15 @@
        skipRecCounter.increment(skipRec);
        reportNextRecordRange(umbilical, grpIndex);
      }
-
-     private Class<?> getClassFromMetadata(Map<String, String> metadata)
-         throws IOException {
-       String classname = metadata.get(SerializationBase.CLASS_KEY);
-       if (classname == null) {
-         throw new IOException(
-             "No key class name specified; Record-skipping requires the use of "
-             + "Writable serialization");
-       }
-       try {
-         return conf.getClassByName(classname);
-       } catch (ClassNotFoundException e) {
-         throw new IOException(e);
-       }
-     }
-
+     
      @SuppressWarnings("unchecked")
      private void writeSkippedRec(KEY key, VALUE value) throws IOException{
        if(skipWriter==null) {
-         // Skipped record writing is currently done with SequenceFiles
-         // which are based on Writable serialization. This can only
-         // be done with Writable key and value types. Extract the
-         // key and value class names from the metadata. If this can't
-         // be done, then throw an IOException.
-         Class<?> keyClass = getClassFromMetadata(keyMetadata);
-         Class<?> valClass = getClassFromMetadata(valueMetadata);
-
-         // Now actually open the skipWriter.
          Path skipDir = SkipBadRecords.getSkipOutputPath(conf);
          Path skipFile = new Path(skipDir, getTaskID().toString());
          skipWriter = SequenceFile.createWriter(
                skipFile.getFileSystem(conf), conf, skipFile,
-               keyClass, valClass,
+               keyClass, valClass, 
                CompressionType.BLOCK, reporter);
        }
        skipWriter.append(key, value);
@@ -385,8 +358,8 @@
       rIter = shuffle.run();
     } else {
       final FileSystem rfs = FileSystem.getLocal(job).getRaw();
-      rIter = Merger.merge(job, rfs,
-                           codec,
+      rIter = Merger.merge(job, rfs, job.getMapOutputKeyClass(),
+                           job.getMapOutputValueClass(), codec, 
                            getMapFiles(rfs, true),
                            !conf.getKeepFailedTaskFiles(), 
                            job.getInt(JobContext.IO_SORT_FACTOR, 100),
@@ -400,18 +373,16 @@
     sortPhase.complete();                         // sort is complete
     setPhase(TaskStatus.Phase.REDUCE); 
     statusUpdate(umbilical);
-    Map<String, String> keyMetadata =
-        job.getMapOutputKeySerializationMetadata();
-    Map<String, String> valueMetadata =
-        job.getMapOutputValueSerializationMetadata();
+    Class keyClass = job.getMapOutputKeyClass();
+    Class valueClass = job.getMapOutputValueClass();
     RawComparator comparator = job.getOutputValueGroupingComparator();
 
     if (useNewApi) {
-      runNewReducer(job, umbilical, reporter, rIter, comparator,
-                    keyMetadata, valueMetadata);
+      runNewReducer(job, umbilical, reporter, rIter, comparator, 
+                    keyClass, valueClass);
     } else {
-      runOldReducer(job, umbilical, reporter, rIter, comparator,
-                    keyMetadata, valueMetadata);
+      runOldReducer(job, umbilical, reporter, rIter, comparator, 
+                    keyClass, valueClass);
     }
     done(umbilical, reporter);
   }
@@ -423,8 +394,8 @@
                      final TaskReporter reporter,
                      RawKeyValueIterator rIter,
                      RawComparator<INKEY> comparator,
-                     Map<String, String> keyMetadata,
-                     Map<String, String> valueMetadata) throws IOException {
+                     Class<INKEY> keyClass,
+                     Class<INVALUE> valueClass) throws IOException {
     Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer = 
       ReflectionUtils.newInstance(job.getReducerClass(), job);
     // make output collector
@@ -454,10 +425,10 @@
       
       ReduceValuesIterator<INKEY,INVALUE> values = isSkipping() ? 
           new SkippingReduceValuesIterator<INKEY,INVALUE>(rIter, 
-              comparator, keyMetadata, valueMetadata,
+              comparator, keyClass, valueClass, 
               job, reporter, umbilical) :
           new ReduceValuesIterator<INKEY,INVALUE>(rIter, 
-          job.getOutputValueGroupingComparator(), keyMetadata, valueMetadata,
+          job.getOutputValueGroupingComparator(), keyClass, valueClass, 
           job, reporter);
       values.informReduceProgress();
       while (values.more()) {
@@ -519,8 +490,8 @@
                      final TaskReporter reporter,
                      RawKeyValueIterator rIter,
                      RawComparator<INKEY> comparator,
-                     Map<String, String> keyMetadata,
-                     Map<String, String> valueMetadata
+                     Class<INKEY> keyClass,
+                     Class<INVALUE> valueClass
                      ) throws IOException,InterruptedException, 
                               ClassNotFoundException {
     // wrap value iterator to report progress.
@@ -564,8 +535,8 @@
                                                reduceInputValueCounter, 
                                                trackedRW,
                                                committer,
-                                               reporter, comparator,
-                                               keyMetadata, valueMetadata);
+                                               reporter, comparator, keyClass,
+                                               valueClass);
     reducer.run(reducerContext);
     output.close(reducerContext);
   }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=899844&r1=899843&r2=899844&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java Fri Jan 15 23:48:54 2010
@@ -43,7 +43,7 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.serializer.DeserializerBase;
+import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapreduce.TaskCounter;
@@ -1004,28 +1004,24 @@
     private boolean more;                         // more in file
     private RawComparator<KEY> comparator;
     protected Progressable reporter;
-    private DeserializerBase<KEY> keyDeserializer;
-    private DeserializerBase<VALUE> valDeserializer;
+    private Deserializer<KEY> keyDeserializer;
+    private Deserializer<VALUE> valDeserializer;
     private DataInputBuffer keyIn = new DataInputBuffer();
     private DataInputBuffer valueIn = new DataInputBuffer();
     
     public ValuesIterator (RawKeyValueIterator in, 
                            RawComparator<KEY> comparator, 
-                           Map<String, String> keyMetadata,
-                           Map<String, String> valueMetadata,
-                           Configuration conf,
+                           Class<KEY> keyClass,
+                           Class<VALUE> valClass, Configuration conf, 
                            Progressable reporter)
       throws IOException {
       this.in = in;
       this.comparator = comparator;
       this.reporter = reporter;
-      SerializationFactory serializationFactory =
-          new SerializationFactory(conf);
-      this.keyDeserializer =
-          serializationFactory.getDeserializer(keyMetadata);
+      SerializationFactory serializationFactory = new SerializationFactory(conf);
+      this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
       this.keyDeserializer.open(keyIn);
-      this.valDeserializer =
-          serializationFactory.getDeserializer(valueMetadata);
+      this.valDeserializer = serializationFactory.getDeserializer(valClass);
       this.valDeserializer.open(this.valueIn);
       readNextKey();
       key = nextKey;
@@ -1116,10 +1112,10 @@
     private final Counters.Counter combineInputCounter;
 
     public CombineValuesIterator(RawKeyValueIterator in,
-        RawComparator<KEY> comparator, Map<String, String> keyMetadata,
-        Map<String, String> valueMetadata, Configuration conf, Reporter reporter,
+        RawComparator<KEY> comparator, Class<KEY> keyClass,
+        Class<VALUE> valClass, Configuration conf, Reporter reporter,
         Counters.Counter combineInputCounter) throws IOException {
-      super(in, comparator, keyMetadata, valueMetadata, conf, reporter);
+      super(in, comparator, keyClass, valClass, conf, reporter);
       this.combineInputCounter = combineInputCounter;
     }
 
@@ -1143,8 +1139,7 @@
                       org.apache.hadoop.mapreduce.OutputCommitter committer,
                       org.apache.hadoop.mapreduce.StatusReporter reporter,
                       RawComparator<INKEY> comparator,
-                      Map<String, String> inKeyMetadata,
-                      Map<String, String> inValMetadata
+                      Class<INKEY> keyClass, Class<INVALUE> valueClass
   ) throws IOException, InterruptedException {
     org.apache.hadoop.mapreduce.ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
     reduceContext = 
@@ -1156,8 +1151,8 @@
                                                               committer, 
                                                               reporter, 
                                                               comparator, 
-                                                              inKeyMetadata,
-                                                              inValMetadata);
+                                                              keyClass, 
+                                                              valueClass);
 
     org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
         reducerContext = 
@@ -1221,8 +1216,8 @@
   
   protected static class OldCombinerRunner<K,V> extends CombinerRunner<K,V> {
     private final Class<? extends Reducer<K,V,K,V>> combinerClass;
-    private final Map<String, String> keyMetadata;
-    private final Map<String, String> valueMetadata;
+    private final Class<K> keyClass;
+    private final Class<V> valueClass;
     private final RawComparator<K> comparator;
 
     @SuppressWarnings("unchecked")
@@ -1232,8 +1227,8 @@
                                 TaskReporter reporter) {
       super(inputCounter, conf, reporter);
       combinerClass = cls;
-      keyMetadata = job.getMapOutputKeySerializationMetadata();
-      valueMetadata = job.getMapOutputValueSerializationMetadata();
+      keyClass = (Class<K>) job.getMapOutputKeyClass();
+      valueClass = (Class<V>) job.getMapOutputValueClass();
       comparator = (RawComparator<K>) job.getOutputKeyComparator();
     }
 
@@ -1245,8 +1240,8 @@
         ReflectionUtils.newInstance(combinerClass, job);
       try {
         CombineValuesIterator<K,V> values = 
-          new CombineValuesIterator<K,V>(kvIter, comparator, keyMetadata,
-                                         valueMetadata, job, Reporter.NULL,
+          new CombineValuesIterator<K,V>(kvIter, comparator, keyClass, 
+                                         valueClass, job, Reporter.NULL,
                                          inputCounter);
         while (values.more()) {
           combiner.reduce(values.getKey(), values, combineCollector,
@@ -1264,8 +1259,8 @@
         reducerClass;
     private final org.apache.hadoop.mapreduce.TaskAttemptID taskId;
     private final RawComparator<K> comparator;
-    private final Map<String, String> keyMetadata;
-    private final Map<String, String> valueMetadata;
+    private final Class<K> keyClass;
+    private final Class<V> valueClass;
     private final org.apache.hadoop.mapreduce.OutputCommitter committer;
 
     @SuppressWarnings("unchecked")
@@ -1279,8 +1274,8 @@
       super(inputCounter, job, reporter);
       this.reducerClass = reducerClass;
       this.taskId = taskId;
-      keyMetadata = context.getMapOutputKeySerializationMetadata();
-      valueMetadata = context.getMapOutputValueSerializationMetadata();
+      keyClass = (Class<K>) context.getMapOutputKeyClass();
+      valueClass = (Class<V>) context.getMapOutputValueClass();
       comparator = (RawComparator<K>) context.getSortComparator();
       this.committer = committer;
     }
@@ -1318,9 +1313,8 @@
                                                 iterator, null, inputCounter, 
                                                 new OutputConverter(collector),
                                                 committer,
-                                                reporter, comparator,
-                                                keyMetadata,
-                                                valueMetadata);
+                                                reporter, comparator, keyClass,
+                                                valueClass);
       reducer.run(reducerContext);
     } 
   }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java?rev=899844&r1=899843&r2=899844&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java Fri Jan 15 23:48:54 2010
@@ -26,11 +26,9 @@
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
-import java.net.URI;
 import java.net.URL;
 import java.net.URLConnection;
-import java.util.Arrays;
-import java.util.Map;
+import java.net.URI;
 
 import javax.security.auth.login.LoginException;
 
@@ -43,7 +41,6 @@
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
-import org.apache.hadoop.mapreduce.lib.jobdata.ClassBasedJobData;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.mapreduce.util.ConfigUtil;
 import org.apache.hadoop.security.UnixUserGroupInformation;
@@ -631,41 +628,32 @@
                   Partitioner.class);
   }
 
-  @Deprecated
   /**
    * Set the key class for the map output data. This allows the user to
    * specify the map output key class to be different than the final output
    * value class.
-   *
-   * Deprecated: Use ClassBasedJobData.setMapOutputKeyClass() instead
-   *
+   * 
    * @param theClass the map output key class.
    * @throws IllegalStateException if the job is submitted
    */
   public void setMapOutputKeyClass(Class<?> theClass
                                    ) throws IllegalStateException {
-    LOG.warn(
-        "Deprecated: Use ClassBasedJobData.setMapOutputKeyClass() instead");
-    ClassBasedJobData.setMapOutputKeyClass(conf, theClass);
+    ensureState(JobState.DEFINE);
+    conf.setMapOutputKeyClass(theClass);
   }
 
-  @Deprecated
   /**
    * Set the value class for the map output data. This allows the user to
    * specify the map output value class to be different than the final output
    * value class.
-   *
-   * Deprecated: Use ClassBasedJobData.setMapOutputValueClass() instead
-   *
+   * 
    * @param theClass the map output value class.
    * @throws IllegalStateException if the job is submitted
    */
   public void setMapOutputValueClass(Class<?> theClass
                                      ) throws IllegalStateException {
-    LOG.warn(
-        "Deprecated: Use ClassBasedJobData.setMapOutputValueClass() "
-        + "instead");
-    ClassBasedJobData.setMapOutputValueClass(conf, theClass);
+    ensureState(JobState.DEFINE);
+    conf.setMapOutputValueClass(theClass);
   }
 
   /**
@@ -868,32 +856,6 @@
   }
 
   /**
-   * Set the metadata used by the serialization framework to instantiate
-   * (de)serializers for key data emitted by mappers.
-   *
-   * @param metadata the metadata used by the serialization framework for
-   * the mapper output key.
-   */
-  public void setMapOutputKeySerializationMetadata(
-      Map<String, String> metadata) {
-    ensureState(JobState.DEFINE);
-    conf.setMap(MAP_OUTPUT_KEY_METADATA, metadata);
-  }
-
-  /**
-   * Set the metadata used by the serialization framework to instantiate
-   * (de)serializers for value data emitted by mappers.
-   *
-   * @param metadata the metadata used by the serialization framework for
-   * the mapper output value.
-   */
-  public void setMapOutputValueSerializationMetadata(
-      Map<String, String> metadata) {
-    ensureState(JobState.DEFINE);
-    conf.setMap(MAP_OUTPUT_VALUE_METADATA, metadata);
-  }
-
-  /**
    * Set whether the system should collect profiler information for some of 
    * the tasks in this job? The information is stored in the user log 
    * directory.

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=899844&r1=899843&r2=899844&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java Fri Jan 15 23:48:54 2010
@@ -20,7 +20,6 @@
 
 import java.io.IOException;
 import java.net.URI;
-import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -28,8 +27,6 @@
 import org.apache.hadoop.conf.Configuration.IntegerRanges;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.serializer.DeserializerBase;
-import org.apache.hadoop.io.serializer.SerializerBase;
 import org.apache.hadoop.mapreduce.Mapper;
 
 /**
@@ -176,20 +173,10 @@
     "mapreduce.map.output.compress";
   public static final String MAP_OUTPUT_COMPRESS_CODEC = 
     "mapreduce.map.output.compress.codec";
-  @Deprecated
   public static final String MAP_OUTPUT_KEY_CLASS = 
     "mapreduce.map.output.key.class";
-  public static final String MAP_OUTPUT_KEY_METADATA =
-    "mapreduce.map.output.key.metadata";
-  public static final String MAP_OUTPUT_KEY_METADATA_SET =
-    "mapreduce.map.output.key.metadata.jobdata.initialized";
-  @Deprecated
   public static final String MAP_OUTPUT_VALUE_CLASS = 
     "mapreduce.map.output.value.class";
-  public static final String MAP_OUTPUT_VALUE_METADATA =
-    "mapreduce.map.output.value.metadata";
-  public static final String MAP_OUTPUT_VALUE_METADATA_SET =
-    "mapreduce.map.output.value.metadata.jobdata.initialized";
   public static final String MAP_OUTPUT_KEY_FIELD_SEPERATOR = 
     "mapreduce.map.output.key.field.separator";
   public static final String MAP_LOG_LEVEL = "mapreduce.map.log.level";
@@ -282,24 +269,19 @@
    */
   public Class<?> getOutputValueClass();
 
-  @Deprecated
   /**
    * Get the key class for the map output data. If it is not set, use the
    * (final) output key class. This allows the map output key class to be
    * different than the final output key class.
-   *
-   * (Deprecated: Use ClassBasedJobData.getMapOutputKeyClass())
    * @return the map output key class.
    */
   public Class<?> getMapOutputKeyClass();
 
-  @Deprecated
   /**
    * Get the value class for the map output data. If it is not set, use the
    * (final) output value class This allows the map output value class to be
    * different than the final output value class.
-   *
-   * (Deprecated: Use ClassBasedJobData.getMapOutputValueClass())
+   *  
    * @return the map output value class.
    */
   public Class<?> getMapOutputValueClass();
@@ -361,34 +343,6 @@
      throws ClassNotFoundException;
 
   /**
-   * Get the serializer to encode keys from the mapper.
-   *
-   * @return the {@link SerializerBase} for the mapper output keys.
-   */
-  public <T> SerializerBase<T> getMapOutputKeySerializer();
-
-  /**
-   * Get the deserializer to decode keys from the mapper.
-   *
-   * @return the {@link DeserializerBase} for the mapper output keys.
-   */
-  public <T> DeserializerBase<T> getMapOutputKeyDeserializer();
-
-  /**
-   * Get the serializer to encode values from the mapper.
-   *
-   * @return the {@link SerializerBase} for the mapper output values.
-   */
-  public <T> SerializerBase<T> getMapOutputValueSerializer();
-
-  /**
-   * Get the deserializer to decode values from the mapper.
-   *
-   * @return the {@link DeserializerBase} for the mapper output values.
-   */
-  public <T> DeserializerBase<T> getMapOutputValueDeserializer();
-
-  /**
    * Get the {@link RawComparator} comparator used to compare keys.
    * 
    * @return the {@link RawComparator} comparator used to compare keys.
@@ -527,21 +481,4 @@
    */
   public int getMaxReduceAttempts();
 
-  /**
-   * Get the metadata used by the serialization framework to instantiate
-   * (de)serializers for key data emitted by mappers.
-   *
-   * @return the metadata used by the serialization framework for the mapper
-   * output key.
-   */
-  public Map<String, String> getMapOutputKeySerializationMetadata();
-
-  /**
-   * Get the metadata used by the serialization framework to instantiate
-   * (de)serializers for value data emitted by mappers.
-   *
-   * @return the metadata used by the serialization framework for the mapper
-   * output value.
-   */
-  public Map<String, String> getMapOutputValueSerializationMetadata();
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java?rev=899844&r1=899843&r2=899844&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java Fri Jan 15 23:48:54 2010
@@ -18,15 +18,12 @@
 package org.apache.hadoop.mapreduce.lib.chain;
 
 import java.io.IOException;
-import java.util.Map;
 import java.net.URI;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration.IntegerRanges;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.serializer.DeserializerBase;
-import org.apache.hadoop.io.serializer.SerializerBase;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -257,36 +254,6 @@
   }
 
   @Override
-  public <T> SerializerBase<T> getMapOutputKeySerializer() {
-    return base.getMapOutputKeySerializer();
-  }
-
-  @Override
-  public <T> SerializerBase<T> getMapOutputValueSerializer() {
-    return base.getMapOutputValueSerializer();
-  }
-
-  @Override
-  public <T> DeserializerBase<T> getMapOutputKeyDeserializer() {
-    return base.getMapOutputKeyDeserializer();
-  }
-
-  @Override
-  public <T> DeserializerBase<T> getMapOutputValueDeserializer() {
-    return base.getMapOutputValueDeserializer();
-  }
-
-  @Override
-  public Map<String, String> getMapOutputKeySerializationMetadata() {
-    return base.getMapOutputKeySerializationMetadata();
-  }
-
-  @Override
-  public Map<String, String> getMapOutputValueSerializationMetadata() {
-    return base.getMapOutputValueSerializationMetadata();
-  }
-
-  @Override
   public Class<? extends Partitioner<?, ?>> getPartitionerClass()
       throws ClassNotFoundException {
     return base.getPartitionerClass();

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java?rev=899844&r1=899843&r2=899844&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java Fri Jan 15 23:48:54 2010
@@ -18,15 +18,12 @@
 package org.apache.hadoop.mapreduce.lib.chain;
 
 import java.io.IOException;
-import java.util.Map;
 import java.net.URI;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration.IntegerRanges;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.serializer.DeserializerBase;
-import org.apache.hadoop.io.serializer.SerializerBase;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.JobID;
@@ -250,36 +247,6 @@
   }
 
   @Override
-  public <T> SerializerBase<T> getMapOutputKeySerializer() {
-    return base.getMapOutputKeySerializer();
-  }
-
-  @Override
-  public <T> SerializerBase<T> getMapOutputValueSerializer() {
-    return base.getMapOutputValueSerializer();
-  }
-
-  @Override
-  public <T> DeserializerBase<T> getMapOutputKeyDeserializer() {
-    return base.getMapOutputKeyDeserializer();
-  }
-
-  @Override
-  public <T> DeserializerBase<T> getMapOutputValueDeserializer() {
-    return base.getMapOutputValueDeserializer();
-  }
-
-  @Override
-  public Map<String, String> getMapOutputKeySerializationMetadata() {
-    return base.getMapOutputKeySerializationMetadata();
-  }
-
-  @Override
-  public Map<String, String> getMapOutputValueSerializationMetadata() {
-    return base.getMapOutputValueSerializationMetadata();
-  }
-
-  @Override
   public Class<? extends Partitioner<?, ?>> getPartitionerClass()
       throws ClassNotFoundException {
     return base.getPartitionerClass();

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java?rev=899844&r1=899843&r2=899844&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java Fri Jan 15 23:48:54 2010
@@ -20,14 +20,11 @@
 
 import java.io.IOException;
 import java.net.URI;
-import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration.IntegerRanges;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.serializer.DeserializerBase;
-import org.apache.hadoop.io.serializer.SerializerBase;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -244,48 +241,16 @@
     }
 
     @Override
-    @Deprecated
     public Class<?> getOutputKeyClass() {
       return mapContext.getOutputKeyClass();
     }
 
     @Override
-    @Deprecated
     public Class<?> getOutputValueClass() {
       return mapContext.getOutputValueClass();
     }
 
     @Override
-    public <T> SerializerBase<T> getMapOutputKeySerializer() {
-      return mapContext.getMapOutputKeySerializer();
-    }
-
-    @Override
-    public <T> SerializerBase<T> getMapOutputValueSerializer() {
-      return mapContext.getMapOutputValueSerializer();
-    }
-
-    @Override
-    public <T> DeserializerBase<T> getMapOutputKeyDeserializer() {
-      return mapContext.getMapOutputKeyDeserializer();
-    }
-
-    @Override
-    public <T> DeserializerBase<T> getMapOutputValueDeserializer() {
-      return mapContext.getMapOutputValueDeserializer();
-    }
-
-    @Override
-    public Map<String, String> getMapOutputKeySerializationMetadata() {
-      return mapContext.getMapOutputKeySerializationMetadata();
-    }
-
-    @Override
-    public Map<String, String> getMapOutputValueSerializationMetadata() {
-      return mapContext.getMapOutputValueSerializationMetadata();
-    }
-
-    @Override
     public Class<? extends Partitioner<?, ?>> getPartitionerClass()
         throws ClassNotFoundException {
       return mapContext.getPartitionerClass();

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java?rev=899844&r1=899843&r2=899844&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java Fri Jan 15 23:48:54 2010
@@ -20,14 +20,11 @@
 
 import java.io.IOException;
 import java.net.URI;
-import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration.IntegerRanges;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.serializer.DeserializerBase;
-import org.apache.hadoop.io.serializer.SerializerBase;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.JobID;
@@ -200,13 +197,11 @@
     }
 
     @Override
-    @Deprecated
     public Class<?> getMapOutputKeyClass() {
       return reduceContext.getMapOutputKeyClass();
     }
 
     @Override
-    @Deprecated
     public Class<?> getMapOutputValueClass() {
       return reduceContext.getMapOutputValueClass();
     }
@@ -249,36 +244,6 @@
     }
 
     @Override
-    public <T> SerializerBase<T> getMapOutputKeySerializer() {
-      return reduceContext.getMapOutputKeySerializer();
-    }
-
-    @Override
-    public <T> SerializerBase<T> getMapOutputValueSerializer() {
-      return reduceContext.getMapOutputValueSerializer();
-    }
-
-    @Override
-    public <T> DeserializerBase<T> getMapOutputKeyDeserializer() {
-      return reduceContext.getMapOutputKeyDeserializer();
-    }
-
-    @Override
-    public <T> DeserializerBase<T> getMapOutputValueDeserializer() {
-      return reduceContext.getMapOutputValueDeserializer();
-    }
-
-    @Override
-    public Map<String, String> getMapOutputKeySerializationMetadata() {
-      return reduceContext.getMapOutputKeySerializationMetadata();
-    }
-
-    @Override
-    public Map<String, String> getMapOutputValueSerializationMetadata() {
-      return reduceContext.getMapOutputValueSerializationMetadata();
-    }
-
-    @Override
     public Class<? extends Partitioner<?, ?>> getPartitionerClass()
         throws ClassNotFoundException {
       return reduceContext.getPartitionerClass();

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java?rev=899844&r1=899843&r2=899844&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java Fri Jan 15 23:48:54 2010
@@ -20,17 +20,11 @@
 
 import java.io.IOException;
 import java.net.URI;
-import java.util.Map;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration.IntegerRanges;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.serializer.DeserializerBase;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.SerializerBase;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -41,7 +35,6 @@
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.jobdata.ClassBasedJobData;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 
@@ -51,8 +44,6 @@
  */
 public class JobContextImpl implements JobContext {
 
-  private final static Log LOG = LogFactory.getLog(JobContextImpl.class.getName());
-
   protected final org.apache.hadoop.mapred.JobConf conf;
   private final JobID jobId;
   
@@ -111,7 +102,6 @@
     return conf.getOutputValueClass();
   }
 
-  @Deprecated
   /**
    * Get the key class for the map output data. If it is not set, use the
    * (final) output key class. This allows the map output key class to be
@@ -119,12 +109,9 @@
    * @return the map output key class.
    */
   public Class<?> getMapOutputKeyClass() {
-    LOG.warn(
-        "Deprecated: Use ClassBasedJobData.getMapOutputKeyClass() instead");
-    return ClassBasedJobData.getMapOutputKeyClass(conf);
+    return conf.getMapOutputKeyClass();
   }
 
-  @Deprecated
   /**
    * Get the value class for the map output data. If it is not set, use the
    * (final) output value class This allows the map output value class to be
@@ -133,9 +120,7 @@
    * @return the map output value class.
    */
   public Class<?> getMapOutputValueClass() {
-    LOG.warn("Deprecated: Use ClassBasedJobData.getMapOutputValueClass() "
-        + "instead");
-    return ClassBasedJobData.getMapOutputValueClass(conf);
+    return conf.getMapOutputValueClass();
   }
 
   /**
@@ -221,42 +206,6 @@
   }
 
   /**
-   * Get the serializer to encode keys from the mapper.
-   *
-   * @return the {@link SerializerBase} for the mapper output keys.
-   */
-  public <T> SerializerBase<T> getMapOutputKeySerializer() {
-    return conf.getMapOutputKeySerializer();
-  }
-
-  /**
-   * Get the deserializer to decode keys from the mapper.
-   *
-   * @return the {@link DeserializerBase} for the mapper output keys.
-   */
-  public <T> DeserializerBase<T> getMapOutputKeyDeserializer() {
-    return conf.getMapOutputKeyDeserializer();
-  }
-
-  /**
-   * Get the serializer to encode values from the mapper.
-   *
-   * @return the {@link SerializerBase} for the mapper output values.
-   */
-  public <T> SerializerBase<T> getMapOutputValueSerializer() {
-    return conf.getMapOutputValueSerializer();
-  }
-
-  /**
-   * Get the deserializer to decode values from the mapper.
-   *
-   * @return the {@link DeserializerBase} for the mapper output values.
-   */
-  public <T> DeserializerBase<T> getMapOutputValueDeserializer() {
-    return conf.getMapOutputValueDeserializer();
-  }
-
-  /**
    * Get the {@link RawComparator} comparator used to compare keys.
    * 
    * @return the {@link RawComparator} comparator used to compare keys.
@@ -434,26 +383,5 @@
   public String getUser() {
     return conf.getUser();
   }
-
-  /**
-   * Get the metadata used by the serialization framework to instantiate
-   * (de)serializers for key data emitted by mappers.
-   *
-   * @return the metadata used by the serialization framework for the mapper
-   * output key.
-   */
-  public Map<String, String> getMapOutputKeySerializationMetadata() {
-    return conf.getMapOutputKeySerializationMetadata();
-  }
-
-  /**
-   * Get the metadata used by the serialization framework to instantiate
-   * (de)serializers for value data emitted by mappers.
-   *
-   * @return the metadata used by the serialization framework for the mapper
-   * output value.
-   */
-  public Map<String, String> getMapOutputValueSerializationMetadata() {
-    return conf.getMapOutputValueSerializationMetadata();
-  }
+  
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/ReduceContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/ReduceContextImpl.java?rev=899844&r1=899843&r2=899844&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/ReduceContextImpl.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/ReduceContextImpl.java Fri Jan 15 23:48:54 2010
@@ -21,7 +21,6 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.Map;
 import java.util.NoSuchElementException;
 
 import org.apache.hadoop.conf.Configuration;
@@ -71,8 +70,8 @@
   private boolean isMarked = false;
   private BackupStore<KEYIN,VALUEIN> backupStore;
   private final SerializationFactory serializationFactory;
-  private final Map<String, String> keyMetadata;
-  private final Map<String, String> valueMetadata;
+  private final Class<KEYIN> keyClass;
+  private final Class<VALUEIN> valueClass;
   private final Configuration conf;
   private final TaskAttemptID taskid;
   private int currentKeyLength = -1;
@@ -86,8 +85,8 @@
                            OutputCommitter committer,
                            StatusReporter reporter,
                            RawComparator<KEYIN> comparator,
-                           Map<String, String> keyMetadata,
-                           Map<String, String> valueMetadata
+                           Class<KEYIN> keyClass,
+                           Class<VALUEIN> valueClass
                           ) throws InterruptedException, IOException{
     super(conf, taskid, output, committer, reporter);
     this.input = input;
@@ -95,13 +94,13 @@
     this.inputValueCounter = inputValueCounter;
     this.comparator = comparator;
     this.serializationFactory = new SerializationFactory(conf);
-    this.keyMetadata = keyMetadata;
-    this.valueMetadata = valueMetadata;
-    this.keyDeserializer = serializationFactory.getDeserializer(keyMetadata);
+    this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
     this.keyDeserializer.open(buffer);
-    this.valueDeserializer = serializationFactory.getDeserializer(valueMetadata);
+    this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
     this.valueDeserializer.open(buffer);
     hasMore = input.next();
+    this.keyClass = keyClass;
+    this.valueClass = valueClass;
     this.conf = conf;
     this.taskid = taskid;
   }
@@ -327,12 +326,12 @@
       WritableUtils.writeVInt(out, currentKeyLength);
       WritableUtils.writeVInt(out, currentValueLength);
       Serializer<KEYIN> keySerializer = 
-        serializationFactory.getSerializer(keyMetadata);
+        serializationFactory.getSerializer(keyClass);
       keySerializer.open(out);
       keySerializer.serialize(getCurrentKey());
 
       Serializer<VALUEIN> valueSerializer = 
-        serializationFactory.getSerializer(valueMetadata);
+        serializationFactory.getSerializer(valueClass);
       valueSerializer.open(out);
       valueSerializer.serialize(getCurrentValue());
     }



Mime
View raw message