hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r898019 [1/2] - in /hadoop/mapreduce/trunk: ./ src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/ src/examples/org/apache/hadoop/examples/ src/examples/org/apache/hadoop/examples/pi/ src/java/org/apache/hadoop/mapred/ src/...
Date Mon, 11 Jan 2010 19:22:43 GMT
Author: cutting
Date: Mon Jan 11 19:22:42 2010
New Revision: 898019

URL: http://svn.apache.org/viewvc?rev=898019&view=rev
Log:
AVRO-1126. Shuffle now uses Serialization API for comparisons, permitting non-Writable intermediate data.

Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/AvroGenericJobData.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/AvroReflectJobData.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/AvroSpecificJobData.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/ClassBasedJobData.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/JavaSerializationJobData.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/SchemaBasedJobData.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/WritableJobData.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestAvroSerialization.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/avro/
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/avro/key.avsc
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/avro/val.avsc
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/build.xml
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContext.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/BaileyBorweinPlouffe.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/DBCountPageView.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/SecondarySort.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IFile.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Merger.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/ReduceContextImpl.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=898019&r1=898018&r2=898019&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Jan 11 19:22:42 2010
@@ -103,6 +103,10 @@
     MAPREDUCE-1295. Add a tool in Rumen for folding and manipulating job
     traces. (Dick King via cdouglas)
 
+    MAPREDUCE-1126. Shuffle now uses Serialization API for
+    comparisons, permitting non-Writable intermediate data.
+    (Aaron Kimball via cutting)
+
   OPTIMIZATIONS
 
     MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band

Modified: hadoop/mapreduce/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/build.xml?rev=898019&r1=898018&r2=898019&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/build.xml (original)
+++ hadoop/mapreduce/trunk/build.xml Mon Jan 11 19:22:42 2010
@@ -322,7 +322,7 @@
     </taskdef>
     <protocol destdir="${build.src}">
       <fileset dir="${mapred.src.dir}">
-	<include name="**/*.avpr" />
+        <include name="**/*.avpr" />
       </fileset>
     </protocol>
   </target>
@@ -472,7 +472,20 @@
   <!-- Compile test code                                                  --> 
   <!-- ================================================================== -->
 
-  <target name="compile-mapred-test" depends="compile-mapred-classes, compile-examples, ivy-retrieve-test">
+  <target name="avro-generate-test">
+    <!-- generate avro record instances for tests -->
+    <taskdef name="schema" classname="org.apache.avro.specific.SchemaTask">
+      <classpath refid="classpath" />
+    </taskdef>
+    <schema destdir="${test.generated.dir}">
+      <fileset dir="${test.src.dir}">
+        <include name="**/*.avsc" />
+      </fileset>
+    </schema>
+  </target>
+
+  <target name="compile-mapred-test" depends="compile-mapred-classes, compile-examples,
+      ivy-retrieve-test, avro-generate-test">
 
     <mkdir dir="${test.mapred.build.classes}"/>
     <mkdir dir="${test.mapred.build.testjar}"/>
@@ -480,7 +493,7 @@
 
     <javac 
       encoding="${build.encoding}" 
-      srcdir="${test.src.dir}/mapred;${test.src.dir}/unit"
+      srcdir="${test.src.dir}/mapred;${test.src.dir}/unit;${test.generated.dir}"
       includes="org/apache/hadoop/**/*.java"
       destdir="${test.mapred.build.classes}"
       debug="${javac.debug}"

Modified: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContext.java?rev=898019&r1=898018&r2=898019&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContext.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContext.java Mon Jan 11 19:22:42 2010
@@ -20,9 +20,11 @@
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.SerializationBase;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -42,6 +44,13 @@
 
   private MockOutputCollector<KEYOUT, VALUEOUT> output;
 
+  /** Return a metadata map that will instantiate serializers
+   * for Writable classes.
+   */
+  private static Map<String, String> getWritableSerializationMap() {
+    return SerializationBase.getMetadataFromClass(Text.class);
+  }
+
   public MockReduceContext(final List<Pair<KEYIN, List<VALUEIN>>> in, 
                            final Counters counters) 
   throws IOException, InterruptedException {
@@ -49,7 +58,7 @@
           new TaskAttemptID("mrunit-jt", 0, TaskType.REDUCE, 0, 0),
           new MockRawKeyValueIterator(), null, null, null,
           new MockOutputCommitter(), new MockReporter(counters), null,
-          (Class) Text.class, (Class) Text.class);
+          getWritableSerializationMap(), getWritableSerializationMap());
     this.inputIter = in.iterator();
     this.output = new MockOutputCollector<KEYOUT, VALUEOUT>();
   }

Modified: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/BaileyBorweinPlouffe.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/BaileyBorweinPlouffe.java?rev=898019&r1=898018&r2=898019&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/BaileyBorweinPlouffe.java (original)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/BaileyBorweinPlouffe.java Mon Jan 11 19:22:42 2010
@@ -45,6 +45,7 @@
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.jobdata.WritableJobData;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -315,8 +316,10 @@
 
     // setup mapper
     job.setMapperClass(BbpMapper.class);
-    job.setMapOutputKeyClass(LongWritable.class);
-    job.setMapOutputValueClass(BytesWritable.class);
+    WritableJobData.setMapOutputKeyClass(job.getConfiguration(),
+        LongWritable.class);
+    WritableJobData.setMapOutputValueClass(job.getConfiguration(),
+        BytesWritable.class);
 
     // setup reducer
     job.setReducerClass(BbpReducer.class);

Modified: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/DBCountPageView.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/DBCountPageView.java?rev=898019&r1=898018&r2=898019&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/DBCountPageView.java (original)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/DBCountPageView.java Mon Jan 11 19:22:42 2010
@@ -45,6 +45,7 @@
 import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
 import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.hadoop.mapreduce.lib.jobdata.WritableJobData;
 import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
@@ -399,8 +400,10 @@
 
     DBOutputFormat.setOutput(job, "Pageview", PageviewFieldNames);
     
-    job.setMapOutputKeyClass(Text.class);
-    job.setMapOutputValueClass(LongWritable.class);
+    WritableJobData.setMapOutputKeyClass(job.getConfiguration(),
+        Text.class);
+    WritableJobData.setMapOutputValueClass(job.getConfiguration(),
+        LongWritable.class);
 
     job.setOutputKeyClass(PageviewRecord.class);
     job.setOutputValueClass(NullWritable.class);

Modified: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/SecondarySort.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/SecondarySort.java?rev=898019&r1=898018&r2=898019&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/SecondarySort.java (original)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/SecondarySort.java Mon Jan 11 19:22:42 2010
@@ -32,6 +32,7 @@
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.jobdata.WritableJobData;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -224,8 +225,10 @@
     job.setGroupingComparatorClass(FirstGroupingComparator.class);
 
     // the map output is IntPair, IntWritable
-    job.setMapOutputKeyClass(IntPair.class);
-    job.setMapOutputValueClass(IntWritable.class);
+    WritableJobData.setMapOutputKeyClass(job.getConfiguration(),
+        IntPair.class);
+    WritableJobData.setMapOutputValueClass(job.getConfiguration(),
+        IntWritable.class);
 
     // the reduce output is Text, IntWritable
     job.setOutputKeyClass(Text.class);

Modified: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java?rev=898019&r1=898018&r2=898019&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java (original)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/pi/DistSum.java Mon Jan 11 19:22:42 2010
@@ -51,6 +51,7 @@
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.mapreduce.lib.jobdata.WritableJobData;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -247,8 +248,10 @@
     public void init(Job job) {
       // setup mapper
       job.setMapperClass(SummingMapper.class);
-      job.setMapOutputKeyClass(NullWritable.class);
-      job.setMapOutputValueClass(TaskResult.class);
+      WritableJobData.setMapOutputKeyClass(job.getConfiguration(),
+          NullWritable.class);
+      WritableJobData.setMapOutputValueClass(job.getConfiguration(),
+          TaskResult.class);
 
       // zero reducer
       job.setNumReduceTasks(0);
@@ -300,8 +303,10 @@
     public void init(Job job) {
       // setup mapper
       job.setMapperClass(PartitionMapper.class);
-      job.setMapOutputKeyClass(IntWritable.class);
-      job.setMapOutputValueClass(SummationWritable.class);
+      WritableJobData.setMapOutputKeyClass(job.getConfiguration(),
+          IntWritable.class);
+      WritableJobData.setMapOutputValueClass(job.getConfiguration(),
+          SummationWritable.class);
 
       // setup partitioner
       job.setPartitionerClass(IndexPartitioner.class);
@@ -603,4 +608,4 @@
   public static void main(String[] args) throws Exception {
     System.exit(ToolRunner.run(null, new DistSum(), args));
   }
-}
\ No newline at end of file
+}

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IFile.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IFile.java?rev=898019&r1=898018&r2=898019&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IFile.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IFile.java Mon Jan 11 19:22:42 2010
@@ -37,8 +37,7 @@
 import org.apache.hadoop.io.compress.CompressionOutputStream;
 import org.apache.hadoop.io.compress.Compressor;
 import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.io.serializer.SerializerBase;
 
 /**
  * <code>IFile</code> is the simple <key-len, value-len, key, value> format
@@ -78,18 +77,16 @@
 
     IFileOutputStream checksumOut;
 
-    Class<K> keyClass;
-    Class<V> valueClass;
-    Serializer<K> keySerializer;
-    Serializer<V> valueSerializer;
+    SerializerBase<K> keySerializer;
+    SerializerBase<V> valueSerializer;
     
     DataOutputBuffer buffer = new DataOutputBuffer();
 
     public Writer(Configuration conf, FileSystem fs, Path file, 
-                  Class<K> keyClass, Class<V> valueClass,
+                  boolean createSerializers,
                   CompressionCodec codec,
                   Counters.Counter writesCounter) throws IOException {
-      this(conf, fs.create(file), keyClass, valueClass, codec,
+      this(conf, fs.create(file), createSerializers, codec,
            writesCounter);
       ownOutputStream = true;
     }
@@ -99,7 +96,7 @@
     }
 
     public Writer(Configuration conf, FSDataOutputStream out, 
-        Class<K> keyClass, Class<V> valueClass,
+        boolean createSerializers,
         CompressionCodec codec, Counters.Counter writesCounter)
         throws IOException {
       this.writtenRecordsCounter = writesCounter;
@@ -117,30 +114,27 @@
         this.out = new FSDataOutputStream(checksumOut,null);
       }
       
-      this.keyClass = keyClass;
-      this.valueClass = valueClass;
 
-      if (keyClass != null) {
-        SerializationFactory serializationFactory = 
-          new SerializationFactory(conf);
-        this.keySerializer = serializationFactory.getSerializer(keyClass);
+      if (createSerializers) {
+        JobConf job = new JobConf(conf);
+        this.keySerializer = job.getMapOutputKeySerializer();
+        this.valueSerializer = job.getMapOutputValueSerializer();
         this.keySerializer.open(buffer);
-        this.valueSerializer = serializationFactory.getSerializer(valueClass);
         this.valueSerializer.open(buffer);
       }
     }
 
     public Writer(Configuration conf, FileSystem fs, Path file) 
     throws IOException {
-      this(conf, fs, file, null, null, null, null);
+      this(conf, fs, file, false, null, null);
     }
 
     public void close() throws IOException {
 
       // When IFile writer is created by BackupStore, we do not have
-      // Key and Value classes set. So, check before closing the
+      // serializers created. So, check before closing the
       // serializers
-      if (keyClass != null) {
+      if (null != keySerializer) {
         keySerializer.close();
         valueSerializer.close();
       }
@@ -183,12 +177,6 @@
     }
 
     public void append(K key, V value) throws IOException {
-      if (key.getClass() != keyClass)
-        throw new IOException("wrong key class: "+ key.getClass()
-                              +" is not "+ keyClass);
-      if (value.getClass() != valueClass)
-        throw new IOException("wrong value class: "+ value.getClass()
-                              +" is not "+ valueClass);
 
       // Append the 'key'
       keySerializer.serialize(key);

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?rev=898019&r1=898018&r2=898019&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Mon Jan 11 19:22:42 2010
@@ -24,6 +24,8 @@
 import java.net.URL;
 import java.net.URLDecoder;
 import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
@@ -36,6 +38,11 @@
 
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.serializer.DeserializerBase;
+import org.apache.hadoop.io.serializer.SerializationBase;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.SerializerBase;
+import org.apache.hadoop.io.serializer.WritableSerialization;
 
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
@@ -43,6 +50,7 @@
 import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;
 import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
 import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.lib.jobdata.ClassBasedJobData;
 import org.apache.hadoop.mapreduce.util.ConfigUtil;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
@@ -705,6 +713,111 @@
   }
   
   /**
+   * Get the metadata used by the serialization framework to instantiate
+   * (de)serializers for key data emitted by mappers.
+   *
+   * @return the metadata used by the serialization framework for the mapper
+   * output key.
+   */
+  public Map<String, String> getMapOutputKeySerializationMetadata() {
+    Map<String, String> metadata = null;
+    if (getBoolean(JobContext.MAP_OUTPUT_KEY_METADATA_SET, false)) {
+      metadata = getMap(JobContext.MAP_OUTPUT_KEY_METADATA);
+    } else {
+      // Wasn't set via the new metadata map. Maybe it was set by the deprecated
+      // class name?
+      metadata = new HashMap<String, String>();
+      String deprecatedKeyClassName = get(JobContext.MAP_OUTPUT_KEY_CLASS);
+      if (null != deprecatedKeyClassName) {
+        // Yes, it was.
+        metadata.put(WritableSerialization.CLASS_KEY, deprecatedKeyClassName);
+      } else {
+        // Intermediate (k, v) types weren't explicitly set. Build a metadata
+        // map based on the final (k, v) types.
+        metadata.put(WritableSerialization.CLASS_KEY,
+            getOutputKeyClass().getName());
+      }
+    }
+
+    return metadata;
+  }
+
+  /**
+   * Get the metadata used by the serialization framework to instantiate
+   * (de)serializers for value data emitted by mappers.
+   *
+   * @return the metadata used by the serialization framework for the mapper
+   * output value.
+   */
+  public Map<String, String> getMapOutputValueSerializationMetadata() {
+    Map<String, String> metadata = null;
+    if (getBoolean(JobContext.MAP_OUTPUT_VALUE_METADATA_SET, false)) {
+      metadata = getMap(JobContext.MAP_OUTPUT_VALUE_METADATA);
+    } else {
+      // Wasn't set via the new metadata map. Maybe it was set by the deprecated
+      // class name?
+      metadata = new HashMap<String, String>();
+      String deprecatedValClassName = get(JobContext.MAP_OUTPUT_VALUE_CLASS);
+      if (null != deprecatedValClassName) {
+        // Yes, it was.
+        metadata.put(WritableSerialization.CLASS_KEY, deprecatedValClassName);
+      } else {
+        // Intermediate (k, v) types weren't explicitly set. Build a metadata
+        // map based on the final (k, v) types.
+        metadata.put(WritableSerialization.CLASS_KEY,
+            getOutputValueClass().getName());
+      }
+    }
+
+    return metadata;
+  }
+
+  /**
+   * Get the serializer to encode keys from the mapper.
+   *
+   * @return the {@link SerializerBase} for the mapper output keys.
+   */
+  public <T> SerializerBase<T> getMapOutputKeySerializer() {
+    Map<String, String> metadata = getMapOutputKeySerializationMetadata();
+    SerializationFactory factory = new SerializationFactory(this);
+    return factory.getSerializer(metadata);
+  }
+
+  /**
+   * Get the deserializer to decode keys from the mapper.
+   *
+   * @return the {@link DeserializerBase} for the mapper output keys.
+   */
+  public <T> DeserializerBase<T> getMapOutputKeyDeserializer() {
+    Map<String, String> metadata = getMapOutputKeySerializationMetadata();
+    SerializationFactory factory = new SerializationFactory(this);
+    return factory.getDeserializer(metadata);
+  }
+
+  /**
+   * Get the serializer to encode values from the mapper.
+   *
+   * @return the {@link SerializerBase} for the mapper output values.
+   */
+  public <T> SerializerBase<T> getMapOutputValueSerializer() {
+    Map<String, String> metadata = getMapOutputValueSerializationMetadata();
+    SerializationFactory factory = new SerializationFactory(this);
+    return factory.getSerializer(metadata);
+  }
+
+  /**
+   * Get the deserializer to decode values from the mapper.
+   *
+   * @return the {@link DeserializerBase} for the mapper output values.
+   */
+  public <T> DeserializerBase<T> getMapOutputValueDeserializer() {
+    Map<String, String> metadata = getMapOutputValueSerializationMetadata();
+    SerializationFactory factory = new SerializationFactory(this);
+    return factory.getDeserializer(metadata);
+  }
+
+  @Deprecated
+  /**
    * Get the key class for the map output data. If it is not set, use the
    * (final) output key class. This allows the map output key class to be
    * different than the final output key class.
@@ -712,13 +825,10 @@
    * @return the map output key class.
    */
   public Class<?> getMapOutputKeyClass() {
-    Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class);
-    if (retv == null) {
-      retv = getOutputKeyClass();
-    }
-    return retv;
+    return ClassBasedJobData.getMapOutputKeyClass(this);
   }
   
+  @Deprecated
   /**
    * Set the key class for the map output data. This allows the user to
    * specify the map output key class to be different than the final output
@@ -727,9 +837,10 @@
    * @param theClass the map output key class.
    */
   public void setMapOutputKeyClass(Class<?> theClass) {
-    setClass(JobContext.MAP_OUTPUT_KEY_CLASS, theClass, Object.class);
+    ClassBasedJobData.setMapOutputKeyClass(this, theClass);
   }
   
+  @Deprecated
   /**
    * Get the value class for the map output data. If it is not set, use the
    * (final) output value class This allows the map output value class to be
@@ -738,14 +849,10 @@
    * @return the map output value class.
    */
   public Class<?> getMapOutputValueClass() {
-    Class<?> retv = getClass(JobContext.MAP_OUTPUT_VALUE_CLASS, null,
-        Object.class);
-    if (retv == null) {
-      retv = getOutputValueClass();
-    }
-    return retv;
+    return ClassBasedJobData.getMapOutputValueClass(this);
   }
   
+  @Deprecated
   /**
    * Set the value class for the map output data. This allows the user to
    * specify the map output value class to be different than the final output
@@ -754,7 +861,7 @@
    * @param theClass the map output value class.
    */
   public void setMapOutputValueClass(Class<?> theClass) {
-    setClass(JobContext.MAP_OUTPUT_VALUE_CLASS, theClass, Object.class);
+    ClassBasedJobData.setMapOutputValueClass(this, theClass);
   }
   
   /**
@@ -781,12 +888,16 @@
    * 
    * @return the {@link RawComparator} comparator used to compare keys.
    */
+  @SuppressWarnings("unchecked")
   public RawComparator getOutputKeyComparator() {
     Class<? extends RawComparator> theClass = getClass(
       JobContext.KEY_COMPARATOR, null, RawComparator.class);
     if (theClass != null)
       return ReflectionUtils.newInstance(theClass, this);
-    return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class));
+    SerializationFactory factory = new SerializationFactory(this);
+    Map<String, String> metadata = getMapOutputKeySerializationMetadata();
+    SerializationBase serialization = factory.getSerialization(metadata);
+    return serialization.getRawComparator(metadata);
   }
 
   /**

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=898019&r1=898018&r2=898019&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Mon Jan 11 19:22:42 2010
@@ -48,9 +48,10 @@
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.DeserializerBase;
+import org.apache.hadoop.io.serializer.SerializationBase;
 import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.io.serializer.SerializerBase;
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapred.Merger.Segment;
 import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
@@ -346,8 +347,9 @@
      throw wrap;
    }
    SerializationFactory factory = new SerializationFactory(conf);
-   Deserializer<T> deserializer = 
-     (Deserializer<T>) factory.getDeserializer(cls);
+   DeserializerBase<T> deserializer = 
+       (DeserializerBase<T>) factory.getDeserializer(
+       SerializationBase.getMetadataFromClass(cls));
    deserializer.open(inFile);
    T split = deserializer.deserialize(null);
    long pos = inFile.getPos();
@@ -614,8 +616,8 @@
       (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
         ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
     // rebuild the input split
-    org.apache.hadoop.mapreduce.InputSplit split = null;
-    split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
+    org.apache.hadoop.mapreduce.InputSplit split =
+        getSplitDetails(new Path(splitIndex.getSplitLocation()),
         splitIndex.getStartOffset());
 
     org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
@@ -710,12 +712,9 @@
     private final int partitions;
     private final JobConf job;
     private final TaskReporter reporter;
-    private final Class<K> keyClass;
-    private final Class<V> valClass;
     private final RawComparator<K> comparator;
-    private final SerializationFactory serializationFactory;
-    private final Serializer<K> keySerializer;
-    private final Serializer<V> valSerializer;
+    private final SerializerBase<K> keySerializer;
+    private final SerializerBase<V> valSerializer;
     private final CombinerRunner<K,V> combinerRunner;
     private final CombineOutputCollector<K, V> combineCollector;
     
@@ -815,12 +814,9 @@
       LOG.info("record buffer = " + softRecordLimit + "/" + kvoffsets.length);
       // k/v serialization
       comparator = job.getOutputKeyComparator();
-      keyClass = (Class<K>)job.getMapOutputKeyClass();
-      valClass = (Class<V>)job.getMapOutputValueClass();
-      serializationFactory = new SerializationFactory(job);
-      keySerializer = serializationFactory.getSerializer(keyClass);
+      keySerializer = job.getMapOutputKeySerializer();
       keySerializer.open(bb);
-      valSerializer = serializationFactory.getSerializer(valClass);
+      valSerializer = job.getMapOutputValueSerializer();
       valSerializer.open(bb);
       // counters
       mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
@@ -867,16 +863,6 @@
     public synchronized void collect(K key, V value, int partition
                                      ) throws IOException {
       reporter.progress();
-      if (key.getClass() != keyClass) {
-        throw new IOException("Type mismatch in key from map: expected "
-                              + keyClass.getName() + ", recieved "
-                              + key.getClass().getName());
-      }
-      if (value.getClass() != valClass) {
-        throw new IOException("Type mismatch in value from map: expected "
-                              + valClass.getName() + ", recieved "
-                              + value.getClass().getName());
-      }
       final int kvnext = (kvindex + 1) % kvoffsets.length;
       if (--recordRemaining <= 0) {
         // Possible for check to remain < zero, if soft limit remains
@@ -1291,8 +1277,7 @@
           IFile.Writer<K, V> writer = null;
           try {
             long segmentStart = out.getPos();
-            writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
-                                      spilledRecordsCounter);
+            writer = new Writer<K, V>(job, out, true, codec, spilledRecordsCounter);
             if (combinerRunner == null) {
               // spill directly
               DataInputBuffer key = new DataInputBuffer();
@@ -1380,8 +1365,8 @@
           try {
             long segmentStart = out.getPos();
             // Create a new codec, don't care!
-            writer = new IFile.Writer<K,V>(job, out, keyClass, valClass, codec,
-                                            spilledRecordsCounter);
+            writer = new IFile.Writer<K,V>(job, out, true, codec,
+                spilledRecordsCounter);
 
             if (i == partition) {
               final long recordStart = out.getPos();
@@ -1540,7 +1525,7 @@
           for (int i = 0; i < partitions; i++) {
             long segmentStart = finalOut.getPos();
             Writer<K, V> writer =
-              new Writer<K, V>(job, finalOut, keyClass, valClass, codec, null);
+              new Writer<K, V>(job, finalOut, true, codec, null);
             writer.close();
             rec.startOffset = segmentStart;
             rec.rawLength = writer.getRawLength();
@@ -1584,17 +1569,15 @@
           //merge
           @SuppressWarnings("unchecked")
           RawKeyValueIterator kvIter = Merger.merge(job, rfs,
-                         keyClass, valClass, codec,
-                         segmentList, mergeFactor,
+                         codec, segmentList, mergeFactor,
                          new Path(mapId.toString()),
                          job.getOutputKeyComparator(), reporter, sortSegments,
                          null, spilledRecordsCounter, sortPhase.phase());
 
           //write merged output to disk
           long segmentStart = finalOut.getPos();
-          Writer<K, V> writer =
-              new Writer<K, V>(job, finalOut, keyClass, valClass, codec,
-                               spilledRecordsCounter);
+          Writer<K, V> writer = new Writer<K, V>(job, finalOut, true, codec,
+              spilledRecordsCounter);
           if (combinerRunner == null || numSpills < minSpillsForCombine) {
             Merger.writeFile(kvIter, writer, reporter, job);
           } else {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Merger.java?rev=898019&r1=898018&r2=898019&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Merger.java Mon Jan 11 19:22:42 2010
@@ -54,7 +54,6 @@
 
   public static <K extends Object, V extends Object>
   RawKeyValueIterator merge(Configuration conf, FileSystem fs,
-                            Class<K> keyClass, Class<V> valueClass, 
                             CompressionCodec codec,
                             Path[] inputs, boolean deleteInputs, 
                             int mergeFactor, Path tmpDir,
@@ -65,7 +64,7 @@
   throws IOException {
     return 
       new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator, 
-                           reporter, null).merge(keyClass, valueClass,
+                           reporter, null).merge(
                                            mergeFactor, tmpDir,
                                            readsCounter, writesCounter, 
                                            mergePhase);
@@ -73,7 +72,6 @@
 
   public static <K extends Object, V extends Object>
   RawKeyValueIterator merge(Configuration conf, FileSystem fs,
-                            Class<K> keyClass, Class<V> valueClass, 
                             CompressionCodec codec,
                             Path[] inputs, boolean deleteInputs, 
                             int mergeFactor, Path tmpDir,
@@ -87,7 +85,6 @@
     return 
       new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator, 
                            reporter, mergedMapOutputsCounter).merge(
-                                           keyClass, valueClass,
                                            mergeFactor, tmpDir,
                                            readsCounter, writesCounter,
                                            mergePhase);
@@ -95,7 +92,6 @@
   
   public static <K extends Object, V extends Object>
   RawKeyValueIterator merge(Configuration conf, FileSystem fs, 
-                            Class<K> keyClass, Class<V> valueClass, 
                             List<Segment<K, V>> segments, 
                             int mergeFactor, Path tmpDir,
                             RawComparator<K> comparator, Progressable reporter,
@@ -103,14 +99,13 @@
                             Counters.Counter writesCounter,
                             Progress mergePhase)
       throws IOException {
-    return merge(conf, fs, keyClass, valueClass, segments, mergeFactor, tmpDir,
+    return merge(conf, fs, segments, mergeFactor, tmpDir,
                  comparator, reporter, false, readsCounter, writesCounter,
                  mergePhase);
   }
 
   public static <K extends Object, V extends Object>
   RawKeyValueIterator merge(Configuration conf, FileSystem fs,
-                            Class<K> keyClass, Class<V> valueClass,
                             List<Segment<K, V>> segments,
                             int mergeFactor, Path tmpDir,
                             RawComparator<K> comparator, Progressable reporter,
@@ -120,7 +115,7 @@
                             Progress mergePhase)
       throws IOException {
     return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
-                           sortSegments).merge(keyClass, valueClass,
+                           sortSegments).merge(
                                                mergeFactor, tmpDir,
                                                readsCounter, writesCounter,
                                                mergePhase);
@@ -128,7 +123,6 @@
 
   public static <K extends Object, V extends Object>
   RawKeyValueIterator merge(Configuration conf, FileSystem fs,
-                            Class<K> keyClass, Class<V> valueClass,
                             CompressionCodec codec,
                             List<Segment<K, V>> segments,
                             int mergeFactor, Path tmpDir,
@@ -139,7 +133,7 @@
                             Progress mergePhase)
       throws IOException {
     return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
-                           sortSegments, codec).merge(keyClass, valueClass,
+                           sortSegments, codec).merge(
                                                mergeFactor, tmpDir,
                                                readsCounter, writesCounter,
                                                mergePhase);
@@ -147,7 +141,6 @@
 
   public static <K extends Object, V extends Object>
     RawKeyValueIterator merge(Configuration conf, FileSystem fs,
-                            Class<K> keyClass, Class<V> valueClass,
                             List<Segment<K, V>> segments,
                             int mergeFactor, int inMemSegments, Path tmpDir,
                             RawComparator<K> comparator, Progressable reporter,
@@ -157,7 +150,7 @@
                             Progress mergePhase)
       throws IOException {
     return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
-                           sortSegments).merge(keyClass, valueClass,
+                           sortSegments).merge(
                                                mergeFactor, inMemSegments,
                                                tmpDir,
                                                readsCounter, writesCounter,
@@ -167,7 +160,6 @@
 
   static <K extends Object, V extends Object>
   RawKeyValueIterator merge(Configuration conf, FileSystem fs,
-                          Class<K> keyClass, Class<V> valueClass,
                           CompressionCodec codec,
                           List<Segment<K, V>> segments,
                           int mergeFactor, int inMemSegments, Path tmpDir,
@@ -178,7 +170,7 @@
                           Progress mergePhase)
     throws IOException {
   return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
-                         sortSegments, codec).merge(keyClass, valueClass,
+                         sortSegments, codec).merge(
                                              mergeFactor, inMemSegments,
                                              tmpDir,
                                              readsCounter, writesCounter,
@@ -524,21 +516,19 @@
       return comparator.compare(key1.getData(), s1, l1, key2.getData(), s2, l2) < 0;
     }
     
-    public RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
-                                     int factor, Path tmpDir,
+    public RawKeyValueIterator merge(int factor, Path tmpDir,
                                      Counters.Counter readsCounter,
                                      Counters.Counter writesCounter,
                                      Progress mergePhase)
         throws IOException {
-      return merge(keyClass, valueClass, factor, 0, tmpDir,
+      return merge(factor, 0, tmpDir,
                    readsCounter, writesCounter, mergePhase);
     }
 
-    RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
-                                     int factor, int inMem, Path tmpDir,
-                                     Counters.Counter readsCounter,
-                                     Counters.Counter writesCounter,
-                                     Progress mergePhase)
+    RawKeyValueIterator merge(int factor, int inMem, Path tmpDir,
+                              Counters.Counter readsCounter,
+                              Counters.Counter writesCounter,
+                              Progress mergePhase)
         throws IOException {
       LOG.info("Merging " + segments.size() + " sorted segments");
 
@@ -667,7 +657,7 @@
                                               approxOutputSize, conf);
 
           Writer<K, V> writer = 
-            new Writer<K, V>(conf, fs, outputFile, keyClass, valueClass, codec,
+            new Writer<K, V>(conf, fs, outputFile, true, codec,
                              writesCounter);
           writeFile(this, writer, reporter, conf);
           writer.close();

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=898019&r1=898018&r2=898019&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Mon Jan 11 19:22:42 2010
@@ -24,6 +24,7 @@
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
@@ -42,6 +43,7 @@
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.serializer.SerializationBase;
 import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -194,11 +196,11 @@
           extends ValuesIterator<KEY,VALUE> {
     public ReduceValuesIterator (RawKeyValueIterator in,
                                  RawComparator<KEY> comparator, 
-                                 Class<KEY> keyClass,
-                                 Class<VALUE> valClass,
+                                 Map<String, String> keyMetadata,
+                                 Map<String, String> valueMetadata,
                                  Configuration conf, Progressable reporter)
       throws IOException {
-      super(in, comparator, keyClass, valClass, conf, reporter);
+      super(in, comparator, keyMetadata, valueMetadata, conf, reporter);
     }
 
     @Override
@@ -224,18 +226,19 @@
      private Counters.Counter skipGroupCounter;
      private Counters.Counter skipRecCounter;
      private long grpIndex = -1;
-     private Class<KEY> keyClass;
-     private Class<VALUE> valClass;
+     private Map<String, String> keyMetadata;
+     private Map<String, String> valueMetadata;
      private SequenceFile.Writer skipWriter;
      private boolean toWriteSkipRecs;
      private boolean hasNext;
      private TaskReporter reporter;
      
      public SkippingReduceValuesIterator(RawKeyValueIterator in,
-         RawComparator<KEY> comparator, Class<KEY> keyClass,
-         Class<VALUE> valClass, Configuration conf, TaskReporter reporter,
+         RawComparator<KEY> comparator,
+         Map<String, String> keyMetadata, Map<String, String> valueMetadata,
+         Configuration conf, TaskReporter reporter,
          TaskUmbilicalProtocol umbilical) throws IOException {
-       super(in, comparator, keyClass, valClass, conf, reporter);
+       super(in, comparator, keyMetadata, valueMetadata, conf, reporter);
        this.umbilical = umbilical;
        this.skipGroupCounter = 
          reporter.getCounter(TaskCounter.REDUCE_SKIPPED_GROUPS);
@@ -243,8 +246,8 @@
          reporter.getCounter(TaskCounter.REDUCE_SKIPPED_RECORDS);
        this.toWriteSkipRecs = toWriteSkipRecs() &&  
          SkipBadRecords.getSkipOutputPath(conf)!=null;
-       this.keyClass = keyClass;
-       this.valClass = valClass;
+       this.keyMetadata = keyMetadata;
+       this.valueMetadata = valueMetadata;
        this.reporter = reporter;
        skipIt = getSkipRanges().skipRangeIterator();
        mayBeSkip();
@@ -290,15 +293,39 @@
        skipRecCounter.increment(skipRec);
        reportNextRecordRange(umbilical, grpIndex);
      }
-     
+
+     private Class<?> getClassFromMetadata(Map<String, String> metadata)
+         throws IOException {
+       String classname = metadata.get(SerializationBase.CLASS_KEY);
+       if (classname == null) {
+         throw new IOException(
+             "No key class name specified; Record-skipping requires the use of "
+             + "Writable serialization");
+       }
+       try {
+         return conf.getClassByName(classname);
+       } catch (ClassNotFoundException e) {
+         throw new IOException(e);
+       }
+     }
+
      @SuppressWarnings("unchecked")
      private void writeSkippedRec(KEY key, VALUE value) throws IOException{
        if(skipWriter==null) {
+         // Skipped record writing is currently done with SequenceFiles
+         // which are based on Writable serialization. This can only
+         // be done with Writable key and value types. Extract the
+         // key and value class names from the metadata. If this can't
+         // be done, then throw an IOException.
+         Class<?> keyClass = getClassFromMetadata(keyMetadata);
+         Class<?> valClass = getClassFromMetadata(valueMetadata);
+
+         // Now actually open the skipWriter.
          Path skipDir = SkipBadRecords.getSkipOutputPath(conf);
          Path skipFile = new Path(skipDir, getTaskID().toString());
          skipWriter = SequenceFile.createWriter(
                skipFile.getFileSystem(conf), conf, skipFile,
-               keyClass, valClass, 
+               keyClass, valClass,
                CompressionType.BLOCK, reporter);
        }
        skipWriter.append(key, value);
@@ -358,8 +385,8 @@
       rIter = shuffle.run();
     } else {
       final FileSystem rfs = FileSystem.getLocal(job).getRaw();
-      rIter = Merger.merge(job, rfs, job.getMapOutputKeyClass(),
-                           job.getMapOutputValueClass(), codec, 
+      rIter = Merger.merge(job, rfs,
+                           codec,
                            getMapFiles(rfs, true),
                            !conf.getKeepFailedTaskFiles(), 
                            job.getInt(JobContext.IO_SORT_FACTOR, 100),
@@ -373,16 +400,18 @@
     sortPhase.complete();                         // sort is complete
     setPhase(TaskStatus.Phase.REDUCE); 
     statusUpdate(umbilical);
-    Class keyClass = job.getMapOutputKeyClass();
-    Class valueClass = job.getMapOutputValueClass();
+    Map<String, String> keyMetadata =
+        job.getMapOutputKeySerializationMetadata();
+    Map<String, String> valueMetadata =
+        job.getMapOutputValueSerializationMetadata();
     RawComparator comparator = job.getOutputValueGroupingComparator();
 
     if (useNewApi) {
-      runNewReducer(job, umbilical, reporter, rIter, comparator, 
-                    keyClass, valueClass);
+      runNewReducer(job, umbilical, reporter, rIter, comparator,
+                    keyMetadata, valueMetadata);
     } else {
-      runOldReducer(job, umbilical, reporter, rIter, comparator, 
-                    keyClass, valueClass);
+      runOldReducer(job, umbilical, reporter, rIter, comparator,
+                    keyMetadata, valueMetadata);
     }
     done(umbilical, reporter);
   }
@@ -394,8 +423,8 @@
                      final TaskReporter reporter,
                      RawKeyValueIterator rIter,
                      RawComparator<INKEY> comparator,
-                     Class<INKEY> keyClass,
-                     Class<INVALUE> valueClass) throws IOException {
+                     Map<String, String> keyMetadata,
+                     Map<String, String> valueMetadata) throws IOException {
     Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer = 
       ReflectionUtils.newInstance(job.getReducerClass(), job);
     // make output collector
@@ -425,10 +454,10 @@
       
       ReduceValuesIterator<INKEY,INVALUE> values = isSkipping() ? 
           new SkippingReduceValuesIterator<INKEY,INVALUE>(rIter, 
-              comparator, keyClass, valueClass, 
+              comparator, keyMetadata, valueMetadata,
               job, reporter, umbilical) :
           new ReduceValuesIterator<INKEY,INVALUE>(rIter, 
-          job.getOutputValueGroupingComparator(), keyClass, valueClass, 
+          job.getOutputValueGroupingComparator(), keyMetadata, valueMetadata,
           job, reporter);
       values.informReduceProgress();
       while (values.more()) {
@@ -490,8 +519,8 @@
                      final TaskReporter reporter,
                      RawKeyValueIterator rIter,
                      RawComparator<INKEY> comparator,
-                     Class<INKEY> keyClass,
-                     Class<INVALUE> valueClass
+                     Map<String, String> keyMetadata,
+                     Map<String, String> valueMetadata
                      ) throws IOException,InterruptedException, 
                               ClassNotFoundException {
     // wrap value iterator to report progress.
@@ -535,8 +564,8 @@
                                                reduceInputValueCounter, 
                                                trackedRW,
                                                committer,
-                                               reporter, comparator, keyClass,
-                                               valueClass);
+                                               reporter, comparator,
+                                               keyMetadata, valueMetadata);
     reducer.run(reducerContext);
     output.close(reducerContext);
   }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=898019&r1=898018&r2=898019&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java Mon Jan 11 19:22:42 2010
@@ -43,7 +43,7 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.DeserializerBase;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapreduce.TaskCounter;
@@ -1004,24 +1004,28 @@
     private boolean more;                         // more in file
     private RawComparator<KEY> comparator;
     protected Progressable reporter;
-    private Deserializer<KEY> keyDeserializer;
-    private Deserializer<VALUE> valDeserializer;
+    private DeserializerBase<KEY> keyDeserializer;
+    private DeserializerBase<VALUE> valDeserializer;
     private DataInputBuffer keyIn = new DataInputBuffer();
     private DataInputBuffer valueIn = new DataInputBuffer();
     
     public ValuesIterator (RawKeyValueIterator in, 
                            RawComparator<KEY> comparator, 
-                           Class<KEY> keyClass,
-                           Class<VALUE> valClass, Configuration conf, 
+                           Map<String, String> keyMetadata,
+                           Map<String, String> valueMetadata,
+                           Configuration conf,
                            Progressable reporter)
       throws IOException {
       this.in = in;
       this.comparator = comparator;
       this.reporter = reporter;
-      SerializationFactory serializationFactory = new SerializationFactory(conf);
-      this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
+      SerializationFactory serializationFactory =
+          new SerializationFactory(conf);
+      this.keyDeserializer =
+          serializationFactory.getDeserializer(keyMetadata);
       this.keyDeserializer.open(keyIn);
-      this.valDeserializer = serializationFactory.getDeserializer(valClass);
+      this.valDeserializer =
+          serializationFactory.getDeserializer(valueMetadata);
       this.valDeserializer.open(this.valueIn);
       readNextKey();
       key = nextKey;
@@ -1112,10 +1116,10 @@
     private final Counters.Counter combineInputCounter;
 
     public CombineValuesIterator(RawKeyValueIterator in,
-        RawComparator<KEY> comparator, Class<KEY> keyClass,
-        Class<VALUE> valClass, Configuration conf, Reporter reporter,
+        RawComparator<KEY> comparator, Map<String, String> keyMetadata,
+        Map<String, String> valueMetadata, Configuration conf, Reporter reporter,
         Counters.Counter combineInputCounter) throws IOException {
-      super(in, comparator, keyClass, valClass, conf, reporter);
+      super(in, comparator, keyMetadata, valueMetadata, conf, reporter);
       this.combineInputCounter = combineInputCounter;
     }
 
@@ -1139,7 +1143,8 @@
                       org.apache.hadoop.mapreduce.OutputCommitter committer,
                       org.apache.hadoop.mapreduce.StatusReporter reporter,
                       RawComparator<INKEY> comparator,
-                      Class<INKEY> keyClass, Class<INVALUE> valueClass
+                      Map<String, String> inKeyMetadata,
+                      Map<String, String> inValMetadata
   ) throws IOException, InterruptedException {
     org.apache.hadoop.mapreduce.ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
     reduceContext = 
@@ -1151,8 +1156,8 @@
                                                               committer, 
                                                               reporter, 
                                                               comparator, 
-                                                              keyClass, 
-                                                              valueClass);
+                                                              inKeyMetadata,
+                                                              inValMetadata);
 
     org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
         reducerContext = 
@@ -1216,8 +1221,8 @@
   
   protected static class OldCombinerRunner<K,V> extends CombinerRunner<K,V> {
     private final Class<? extends Reducer<K,V,K,V>> combinerClass;
-    private final Class<K> keyClass;
-    private final Class<V> valueClass;
+    private final Map<String, String> keyMetadata;
+    private final Map<String, String> valueMetadata;
     private final RawComparator<K> comparator;
 
     @SuppressWarnings("unchecked")
@@ -1227,8 +1232,8 @@
                                 TaskReporter reporter) {
       super(inputCounter, conf, reporter);
       combinerClass = cls;
-      keyClass = (Class<K>) job.getMapOutputKeyClass();
-      valueClass = (Class<V>) job.getMapOutputValueClass();
+      keyMetadata = job.getMapOutputKeySerializationMetadata();
+      valueMetadata = job.getMapOutputValueSerializationMetadata();
       comparator = (RawComparator<K>) job.getOutputKeyComparator();
     }
 
@@ -1240,8 +1245,8 @@
         ReflectionUtils.newInstance(combinerClass, job);
       try {
         CombineValuesIterator<K,V> values = 
-          new CombineValuesIterator<K,V>(kvIter, comparator, keyClass, 
-                                         valueClass, job, Reporter.NULL,
+          new CombineValuesIterator<K,V>(kvIter, comparator, keyMetadata,
+                                         valueMetadata, job, Reporter.NULL,
                                          inputCounter);
         while (values.more()) {
           combiner.reduce(values.getKey(), values, combineCollector,
@@ -1259,8 +1264,8 @@
         reducerClass;
     private final org.apache.hadoop.mapreduce.TaskAttemptID taskId;
     private final RawComparator<K> comparator;
-    private final Class<K> keyClass;
-    private final Class<V> valueClass;
+    private final Map<String, String> keyMetadata;
+    private final Map<String, String> valueMetadata;
     private final org.apache.hadoop.mapreduce.OutputCommitter committer;
 
     @SuppressWarnings("unchecked")
@@ -1274,8 +1279,8 @@
       super(inputCounter, job, reporter);
       this.reducerClass = reducerClass;
       this.taskId = taskId;
-      keyClass = (Class<K>) context.getMapOutputKeyClass();
-      valueClass = (Class<V>) context.getMapOutputValueClass();
+      keyMetadata = context.getMapOutputKeySerializationMetadata();
+      valueMetadata = context.getMapOutputValueSerializationMetadata();
       comparator = (RawComparator<K>) context.getSortComparator();
       this.committer = committer;
     }
@@ -1313,8 +1318,9 @@
                                                 iterator, null, inputCounter, 
                                                 new OutputConverter(collector),
                                                 committer,
-                                                reporter, comparator, keyClass,
-                                                valueClass);
+                                                reporter, comparator,
+                                                keyMetadata,
+                                                valueMetadata);
       reducer.run(reducerContext);
     } 
   }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java?rev=898019&r1=898018&r2=898019&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java Mon Jan 11 19:22:42 2010
@@ -26,9 +26,11 @@
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
+import java.net.URI;
 import java.net.URL;
 import java.net.URLConnection;
-import java.net.URI;
+import java.util.Arrays;
+import java.util.Map;
 
 import javax.security.auth.login.LoginException;
 
@@ -41,6 +43,7 @@
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.lib.jobdata.ClassBasedJobData;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.mapreduce.util.ConfigUtil;
 import org.apache.hadoop.security.UnixUserGroupInformation;
@@ -628,32 +631,41 @@
                   Partitioner.class);
   }
 
+  @Deprecated
   /**
    * Set the key class for the map output data. This allows the user to
    * specify the map output key class to be different than the final output
    * value class.
-   * 
+   *
+   * Deprecated: Use ClassBasedJobData.setMapOutputKeyClass() instead
+   *
    * @param theClass the map output key class.
    * @throws IllegalStateException if the job is submitted
    */
   public void setMapOutputKeyClass(Class<?> theClass
                                    ) throws IllegalStateException {
-    ensureState(JobState.DEFINE);
-    conf.setMapOutputKeyClass(theClass);
+    LOG.warn(
+        "Deprecated: Use ClassBasedJobData.setMapOutputKeyClass() instead");
+    ClassBasedJobData.setMapOutputKeyClass(conf, theClass);
   }
 
+  @Deprecated
   /**
    * Set the value class for the map output data. This allows the user to
    * specify the map output value class to be different than the final output
    * value class.
-   * 
+   *
+   * Deprecated: Use ClassBasedJobData.setMapOutputValueClass() instead
+   *
    * @param theClass the map output value class.
    * @throws IllegalStateException if the job is submitted
    */
   public void setMapOutputValueClass(Class<?> theClass
                                      ) throws IllegalStateException {
-    ensureState(JobState.DEFINE);
-    conf.setMapOutputValueClass(theClass);
+    LOG.warn(
+        "Deprecated: Use ClassBasedJobData.setMapOutputValueClass() "
+        + "instead");
+    ClassBasedJobData.setMapOutputValueClass(conf, theClass);
   }
 
   /**
@@ -856,6 +868,32 @@
   }
 
   /**
+   * Set the metadata used by the serialization framework to instantiate
+   * (de)serializers for key data emitted by mappers.
+   *
+   * @param metadata the metadata used by the serialization framework for
+   * the mapper output key.
+   */
+  public void setMapOutputKeySerializationMetadata(
+      Map<String, String> metadata) {
+    ensureState(JobState.DEFINE);
+    conf.setMap(MAP_OUTPUT_KEY_METADATA, metadata);
+  }
+
+  /**
+   * Set the metadata used by the serialization framework to instantiate
+   * (de)serializers for value data emitted by mappers.
+   *
+   * @param metadata the metadata used by the serialization framework for
+   * the mapper output value.
+   */
+  public void setMapOutputValueSerializationMetadata(
+      Map<String, String> metadata) {
+    ensureState(JobState.DEFINE);
+    conf.setMap(MAP_OUTPUT_VALUE_METADATA, metadata);
+  }
+
+  /**
    * Set whether the system should collect profiler information for some of 
    * the tasks in this job? The information is stored in the user log 
    * directory.

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=898019&r1=898018&r2=898019&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java Mon Jan 11 19:22:42 2010
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -27,6 +28,8 @@
 import org.apache.hadoop.conf.Configuration.IntegerRanges;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.serializer.DeserializerBase;
+import org.apache.hadoop.io.serializer.SerializerBase;
 import org.apache.hadoop.mapreduce.Mapper;
 
 /**
@@ -173,10 +176,20 @@
     "mapreduce.map.output.compress";
   public static final String MAP_OUTPUT_COMPRESS_CODEC = 
     "mapreduce.map.output.compress.codec";
+  @Deprecated
   public static final String MAP_OUTPUT_KEY_CLASS = 
     "mapreduce.map.output.key.class";
+  public static final String MAP_OUTPUT_KEY_METADATA =
+    "mapreduce.map.output.key.metadata";
+  public static final String MAP_OUTPUT_KEY_METADATA_SET =
+    "mapreduce.map.output.key.metadata.jobdata.initialized";
+  @Deprecated
   public static final String MAP_OUTPUT_VALUE_CLASS = 
     "mapreduce.map.output.value.class";
+  public static final String MAP_OUTPUT_VALUE_METADATA =
+    "mapreduce.map.output.value.metadata";
+  public static final String MAP_OUTPUT_VALUE_METADATA_SET =
+    "mapreduce.map.output.value.metadata.jobdata.initialized";
   public static final String MAP_OUTPUT_KEY_FIELD_SEPERATOR = 
     "mapreduce.map.output.key.field.separator";
   public static final String MAP_LOG_LEVEL = "mapreduce.map.log.level";
@@ -269,19 +282,24 @@
    */
   public Class<?> getOutputValueClass();
 
+  @Deprecated
   /**
    * Get the key class for the map output data. If it is not set, use the
    * (final) output key class. This allows the map output key class to be
    * different than the final output key class.
+   *
+   * (Deprecated: Use ClassBasedJobData.getMapOutputKeyClass())
    * @return the map output key class.
    */
   public Class<?> getMapOutputKeyClass();
 
+  @Deprecated
   /**
    * Get the value class for the map output data. If it is not set, use the
    * (final) output value class This allows the map output value class to be
    * different than the final output value class.
-   *  
+   *
+   * (Deprecated: Use ClassBasedJobData.getMapOutputValueClass())
    * @return the map output value class.
    */
   public Class<?> getMapOutputValueClass();
@@ -343,6 +361,34 @@
      throws ClassNotFoundException;
 
   /**
+   * Get the serializer to encode keys from the mapper.
+   *
+   * @return the {@link SerializerBase} for the mapper output keys.
+   */
+  public <T> SerializerBase<T> getMapOutputKeySerializer();
+
+  /**
+   * Get the deserializer to decode keys from the mapper.
+   *
+   * @return the {@link DeserializerBase} for the mapper output keys.
+   */
+  public <T> DeserializerBase<T> getMapOutputKeyDeserializer();
+
+  /**
+   * Get the serializer to encode values from the mapper.
+   *
+   * @return the {@link SerializerBase} for the mapper output values.
+   */
+  public <T> SerializerBase<T> getMapOutputValueSerializer();
+
+  /**
+   * Get the deserializer to decode values from the mapper.
+   *
+   * @return the {@link DeserializerBase} for the mapper output values.
+   */
+  public <T> DeserializerBase<T> getMapOutputValueDeserializer();
+
+  /**
    * Get the {@link RawComparator} comparator used to compare keys.
    * 
    * @return the {@link RawComparator} comparator used to compare keys.
@@ -481,4 +527,21 @@
    */
   public int getMaxReduceAttempts();
 
+  /**
+   * Get the metadata used by the serialization framework to instantiate
+   * (de)serializers for key data emitted by mappers.
+   *
+   * @return the metadata used by the serialization framework for the mapper
+   * output key.
+   */
+  public Map<String, String> getMapOutputKeySerializationMetadata();
+
+  /**
+   * Get the metadata used by the serialization framework to instantiate
+   * (de)serializers for value data emitted by mappers.
+   *
+   * @return the metadata used by the serialization framework for the mapper
+   * output value.
+   */
+  public Map<String, String> getMapOutputValueSerializationMetadata();
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java?rev=898019&r1=898018&r2=898019&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java Mon Jan 11 19:22:42 2010
@@ -18,12 +18,15 @@
 package org.apache.hadoop.mapreduce.lib.chain;
 
 import java.io.IOException;
+import java.util.Map;
 import java.net.URI;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration.IntegerRanges;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.serializer.DeserializerBase;
+import org.apache.hadoop.io.serializer.SerializerBase;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -254,6 +257,36 @@
   }
 
   @Override
+  public <T> SerializerBase<T> getMapOutputKeySerializer() {
+    return base.getMapOutputKeySerializer();
+  }
+
+  @Override
+  public <T> SerializerBase<T> getMapOutputValueSerializer() {
+    return base.getMapOutputValueSerializer();
+  }
+
+  @Override
+  public <T> DeserializerBase<T> getMapOutputKeyDeserializer() {
+    return base.getMapOutputKeyDeserializer();
+  }
+
+  @Override
+  public <T> DeserializerBase<T> getMapOutputValueDeserializer() {
+    return base.getMapOutputValueDeserializer();
+  }
+
+  @Override
+  public Map<String, String> getMapOutputKeySerializationMetadata() {
+    return base.getMapOutputKeySerializationMetadata();
+  }
+
+  @Override
+  public Map<String, String> getMapOutputValueSerializationMetadata() {
+    return base.getMapOutputValueSerializationMetadata();
+  }
+
+  @Override
   public Class<? extends Partitioner<?, ?>> getPartitionerClass()
       throws ClassNotFoundException {
     return base.getPartitionerClass();

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java?rev=898019&r1=898018&r2=898019&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java Mon Jan 11 19:22:42 2010
@@ -18,12 +18,15 @@
 package org.apache.hadoop.mapreduce.lib.chain;
 
 import java.io.IOException;
+import java.util.Map;
 import java.net.URI;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration.IntegerRanges;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.serializer.DeserializerBase;
+import org.apache.hadoop.io.serializer.SerializerBase;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.JobID;
@@ -247,6 +250,36 @@
   }
 
   @Override
+  public <T> SerializerBase<T> getMapOutputKeySerializer() {
+    return base.getMapOutputKeySerializer();
+  }
+
+  @Override
+  public <T> SerializerBase<T> getMapOutputValueSerializer() {
+    return base.getMapOutputValueSerializer();
+  }
+
+  @Override
+  public <T> DeserializerBase<T> getMapOutputKeyDeserializer() {
+    return base.getMapOutputKeyDeserializer();
+  }
+
+  @Override
+  public <T> DeserializerBase<T> getMapOutputValueDeserializer() {
+    return base.getMapOutputValueDeserializer();
+  }
+
+  @Override
+  public Map<String, String> getMapOutputKeySerializationMetadata() {
+    return base.getMapOutputKeySerializationMetadata();
+  }
+
+  @Override
+  public Map<String, String> getMapOutputValueSerializationMetadata() {
+    return base.getMapOutputValueSerializationMetadata();
+  }
+
+  @Override
   public Class<? extends Partitioner<?, ?>> getPartitionerClass()
       throws ClassNotFoundException {
     return base.getPartitionerClass();

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/AvroGenericJobData.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/AvroGenericJobData.java?rev=898019&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/AvroGenericJobData.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/AvroGenericJobData.java Mon Jan 11 19:22:42 2010
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.jobdata;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.serializer.SerializationBase;
+import org.apache.hadoop.io.serializer.avro.AvroGenericSerialization;
+import org.apache.hadoop.io.serializer.avro.AvroSerialization;
+import org.apache.hadoop.mapreduce.JobContext;
+
+/**
+ * Methods that configure the use of AvroGenericSerialization
+ * for intermediate and output types.
+ */
+public class AvroGenericJobData extends SchemaBasedJobData {
+  protected AvroGenericJobData() { }
+
+  /**
+   * Set the key schema for the map output data. This allows the user to
+   * specify the map output key schema to be different than the final output
+   * key schema.
+   *
+   * @param schema the map output key schema.
+   */
+  public static void setMapOutputKeySchema(Configuration conf, Schema schema) {
+    Map<String, String> metadata = new HashMap<String, String>();
+    metadata.put(SerializationBase.SERIALIZATION_KEY,
+        AvroGenericSerialization.class.getName());
+    metadata.put(AvroSerialization.AVRO_SCHEMA_KEY, schema.toString());
+    conf.setMap(JobContext.MAP_OUTPUT_KEY_METADATA, metadata);
+    conf.setBoolean(JobContext.MAP_OUTPUT_KEY_METADATA_SET, true);
+  }
+
+  /**
+   * Set the value schema for the map output data. This allows the user to
+   * specify the map output value schema to be different than the final output
+   * value schema.
+   *
+   * @param schema the map output value schema.
+   */
+  public static void setMapOutputValueSchema(Configuration conf,
+      Schema schema) {
+    Map<String, String> metadata = new HashMap<String, String>();
+    metadata.put(SerializationBase.SERIALIZATION_KEY,
+        AvroGenericSerialization.class.getName());
+    metadata.put(AvroSerialization.AVRO_SCHEMA_KEY, schema.toString());
+    conf.setMap(JobContext.MAP_OUTPUT_VALUE_METADATA, metadata);
+    conf.setBoolean(JobContext.MAP_OUTPUT_VALUE_METADATA_SET, true);
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/AvroReflectJobData.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/AvroReflectJobData.java?rev=898019&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/AvroReflectJobData.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/AvroReflectJobData.java Mon Jan 11 19:22:42 2010
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.jobdata;
+
+/**
+ * Methods that configure the use of AvroReflectSerialization
+ * for intermediate and output types.
+ */
+public class AvroReflectJobData extends ClassBasedJobData {
+  protected AvroReflectJobData() { }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/AvroSpecificJobData.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/AvroSpecificJobData.java?rev=898019&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/AvroSpecificJobData.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/AvroSpecificJobData.java Mon Jan 11 19:22:42 2010
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.jobdata;
+
+/**
+ * Methods that configure the use of AvroSpecificSerialization
+ * for intermediate and output types.
+ */
+public class AvroSpecificJobData extends ClassBasedJobData {
+  protected AvroSpecificJobData() { }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/ClassBasedJobData.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/ClassBasedJobData.java?rev=898019&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/ClassBasedJobData.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/ClassBasedJobData.java Mon Jan 11 19:22:42 2010
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.jobdata;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.SerializationBase;
+import org.apache.hadoop.mapreduce.JobContext;
+
+/**
+ * Methods that configure the use of class-based serialization mechanisms
+ * for intermediate and output types.
+ *
+ * This is the base for configuration of Writable, Java, AvroReflect,
+ * and AvroSpecific serialization mechanisms.
+ */
+public class ClassBasedJobData {
+  protected ClassBasedJobData() { }
+
+  /**
+   * Resolve the class which is a mapper output type.
+   * Works for both key and value types; the exact parameters
+   * to check are specified by the caller.
+   *
+   * We first check whether the class was set via the serialization
+   * metadata API. If not, then fall back to the deprecated class-specifying
+   * serialization API. If that's not set, then intermediate types are not
+   * specified for this job; use the job's output types -- which may be the
+   * default type, if those were not specified by the user either.
+   *
+   * @param conf The configuration to check.
+   * @param metadataMapName the key identifying the serialization metadata map.
+   * @param deprecatedClassKey the deprecated conf key identifying the class
+   *   as set in the pre-metadata serialization API.
+   * @param jobOutputClassKey the conf key identifying the job output class
+   *   as set via the pre-metadata serialization API.
+   * @param defaultValue the class to return if no return type can
+   *   be found in the configuration.
+   * @throws RuntimeException if the class could not be found.
+   * @return the class which is the mapper output type.
+   */
+  private static Class<?> getMapOutputClass(Configuration conf,
+      String metadataMapName, String deprecatedClassKey,
+      String jobOutputClassKey, Class<?> defaultValue) {
+
+    Map<String, String> metadata = conf.getMap(metadataMapName);
+    String className = metadata.get(SerializationBase.CLASS_KEY);
+    if (null == className) {
+      // Not stored in a map. Might be in the deprecated parameter.
+      className = conf.get(deprecatedClassKey);
+    }
+
+    if (null == className) {
+      // Not set through either mechanism. Use the output value class.
+      className = conf.get(jobOutputClassKey);
+    }
+
+    // Resolve this to a Class object.
+    if (null == className) {
+      return defaultValue; // Return default value class.
+    } else {
+      try {
+        return conf.getClassByName(className);
+      } catch (ClassNotFoundException cnfe) {
+        throw new RuntimeException(cnfe);
+      }
+    }
+  }
+
+
+  /**
+   * Set the key class for the map output data. This allows the user to
+   * specify the map output key class to be different than the final output
+   * key class.
+   *
+   * @param theClass the map output key class.
+   */
+  public static void setMapOutputKeyClass(Configuration conf,
+      Class<?> theClass) {
+    Map<String, String> metadata =
+        SerializationBase.getMetadataFromClass(theClass);
+    conf.setMap(JobContext.MAP_OUTPUT_KEY_METADATA, metadata);
+    conf.setBoolean(JobContext.MAP_OUTPUT_KEY_METADATA_SET, true);
+  }
+
+  /**
+   * Get the key class for the map output data.
+   * @return the map output key class name.
+   */
+  public static Class<?> getMapOutputKeyClass(Configuration conf) {
+    return getMapOutputClass(conf, JobContext.MAP_OUTPUT_KEY_METADATA,
+        JobContext.MAP_OUTPUT_KEY_CLASS,
+        JobContext.OUTPUT_KEY_CLASS,
+        LongWritable.class);
+  }
+
+  /**
+   * Set the value class for the map output data. This allows the user to
+   * specify the map output value class to be different than the final output
+   * value class.
+   *
+   * @param theClass the map output value class.
+   */
+  public static void setMapOutputValueClass(Configuration conf,
+      Class<?> theClass) {
+    Map<String, String> metadata =
+        SerializationBase.getMetadataFromClass(theClass);
+    conf.setMap(JobContext.MAP_OUTPUT_VALUE_METADATA, metadata);
+    conf.setBoolean(JobContext.MAP_OUTPUT_VALUE_METADATA_SET, true);
+  }
+
+  /**
+   * Get the value class for the map output data.
+   * @return the map output value class.
+   */
+  public static Class<?> getMapOutputValueClass(Configuration conf) {
+    return getMapOutputClass(conf, JobContext.MAP_OUTPUT_VALUE_METADATA,
+        JobContext.MAP_OUTPUT_VALUE_CLASS,
+        JobContext.OUTPUT_VALUE_CLASS,
+        Text.class);
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/JavaSerializationJobData.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/JavaSerializationJobData.java?rev=898019&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/JavaSerializationJobData.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/JavaSerializationJobData.java Mon Jan 11 19:22:42 2010
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.jobdata;
+
+/**
+ * Methods that configure the use of JavaSerializationSerialization
+ * for intermediate and output types.
+ */
+public class JavaSerializationJobData extends ClassBasedJobData {
+  protected JavaSerializationJobData() { }
+}



Mime
View raw message