hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r898019 [2/2] - in /hadoop/mapreduce/trunk: ./ src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/ src/examples/org/apache/hadoop/examples/ src/examples/org/apache/hadoop/examples/pi/ src/java/org/apache/hadoop/mapred/ src/...
Date Mon, 11 Jan 2010 19:22:43 GMT
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/SchemaBasedJobData.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/SchemaBasedJobData.java?rev=898019&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/SchemaBasedJobData.java
(added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/SchemaBasedJobData.java
Mon Jan 11 19:22:42 2010
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.jobdata;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.serializer.avro.AvroSerialization;
+import org.apache.hadoop.mapreduce.JobContext;
+
+/**
+ * Methods that configure the use of schema-based serialization mechanisms
+ * for intermediate and output types.
+ *
+ * This is the base for configuration of AvroGenericSerialization
+ * and other schema-based serialization mechanisms.
+ */
+public class SchemaBasedJobData {
+  protected SchemaBasedJobData() { }
+
+  /**
+   * Get the key schema for the map output data.
+   * @return the map output key schema name.
+   */
+  public static Schema getMapOutputKeySchema(Configuration conf) {
+    Map<String, String> metadata = conf.getMap(
+        JobContext.MAP_OUTPUT_KEY_METADATA);
+    return Schema.parse(metadata.get(AvroSerialization.AVRO_SCHEMA_KEY));
+  }
+
+  /**
+   * Get the value schema for the map output data.
+   * @return the map output value schema.
+   */
+  public static Schema getMapOutputValueSchema(Configuration conf) {
+    Map<String, String> metadata = conf.getMap(
+        JobContext.MAP_OUTPUT_VALUE_METADATA);
+    return Schema.parse(metadata.get(AvroSerialization.AVRO_SCHEMA_KEY));
+  }
+
+  /**
+   * Set the key schema for the map output data. This allows the user to
+   * specify the map output key schema to be different than the final output
+   * key schema.
+   *
+   * @param schema the map output key schema.
+   */
+  public static void setMapOutputKeySchema(Configuration conf, Schema schema) {
+    Map<String, String> metadata = new HashMap<String, String>();
+    metadata.put(AvroSerialization.AVRO_SCHEMA_KEY, schema.toString());
+    conf.setMap(JobContext.MAP_OUTPUT_KEY_METADATA, metadata);
+    conf.setBoolean(JobContext.MAP_OUTPUT_KEY_METADATA_SET, true);
+  }
+
+  /**
+   * Set the value schema for the map output data. This allows the user to
+   * specify the map output value schema to be different than the final output
+   * value schema.
+   *
+   * @param schema the map output value schema.
+   */
+  public static void setMapOutputValueSchema(Configuration conf,
+      Schema schema) {
+    Map<String, String> metadata = new HashMap<String, String>();
+    metadata.put(AvroSerialization.AVRO_SCHEMA_KEY, schema.toString());
+    conf.setMap(JobContext.MAP_OUTPUT_VALUE_METADATA, metadata);
+    conf.setBoolean(JobContext.MAP_OUTPUT_VALUE_METADATA_SET, true);
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/WritableJobData.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/WritableJobData.java?rev=898019&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/WritableJobData.java
(added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobdata/WritableJobData.java
Mon Jan 11 19:22:42 2010
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.jobdata;
+
+/**
+ * Methods that configure the use of WritableSerialization
+ * for intermediate and output types.
+ */
+public class WritableJobData extends ClassBasedJobData {
+  protected WritableJobData() { }
+}

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java?rev=898019&r1=898018&r2=898019&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
Mon Jan 11 19:22:42 2010
@@ -20,11 +20,14 @@
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration.IntegerRanges;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.serializer.DeserializerBase;
+import org.apache.hadoop.io.serializer.SerializerBase;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -241,16 +244,48 @@
     }
 
     @Override
+    @Deprecated
     public Class<?> getOutputKeyClass() {
       return mapContext.getOutputKeyClass();
     }
 
     @Override
+    @Deprecated
     public Class<?> getOutputValueClass() {
       return mapContext.getOutputValueClass();
     }
 
     @Override
+    public <T> SerializerBase<T> getMapOutputKeySerializer() {
+      return mapContext.getMapOutputKeySerializer();
+    }
+
+    @Override
+    public <T> SerializerBase<T> getMapOutputValueSerializer() {
+      return mapContext.getMapOutputValueSerializer();
+    }
+
+    @Override
+    public <T> DeserializerBase<T> getMapOutputKeyDeserializer() {
+      return mapContext.getMapOutputKeyDeserializer();
+    }
+
+    @Override
+    public <T> DeserializerBase<T> getMapOutputValueDeserializer() {
+      return mapContext.getMapOutputValueDeserializer();
+    }
+
+    @Override
+    public Map<String, String> getMapOutputKeySerializationMetadata() {
+      return mapContext.getMapOutputKeySerializationMetadata();
+    }
+
+    @Override
+    public Map<String, String> getMapOutputValueSerializationMetadata() {
+      return mapContext.getMapOutputValueSerializationMetadata();
+    }
+
+    @Override
     public Class<? extends Partitioner<?, ?>> getPartitionerClass()
         throws ClassNotFoundException {
       return mapContext.getPartitionerClass();

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java?rev=898019&r1=898018&r2=898019&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
Mon Jan 11 19:22:42 2010
@@ -20,11 +20,14 @@
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration.IntegerRanges;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.serializer.DeserializerBase;
+import org.apache.hadoop.io.serializer.SerializerBase;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.JobID;
@@ -197,11 +200,13 @@
     }
 
     @Override
+    @Deprecated
     public Class<?> getMapOutputKeyClass() {
       return reduceContext.getMapOutputKeyClass();
     }
 
     @Override
+    @Deprecated
     public Class<?> getMapOutputValueClass() {
       return reduceContext.getMapOutputValueClass();
     }
@@ -244,6 +249,36 @@
     }
 
     @Override
+    public <T> SerializerBase<T> getMapOutputKeySerializer() {
+      return reduceContext.getMapOutputKeySerializer();
+    }
+
+    @Override
+    public <T> SerializerBase<T> getMapOutputValueSerializer() {
+      return reduceContext.getMapOutputValueSerializer();
+    }
+
+    @Override
+    public <T> DeserializerBase<T> getMapOutputKeyDeserializer() {
+      return reduceContext.getMapOutputKeyDeserializer();
+    }
+
+    @Override
+    public <T> DeserializerBase<T> getMapOutputValueDeserializer() {
+      return reduceContext.getMapOutputValueDeserializer();
+    }
+
+    @Override
+    public Map<String, String> getMapOutputKeySerializationMetadata() {
+      return reduceContext.getMapOutputKeySerializationMetadata();
+    }
+
+    @Override
+    public Map<String, String> getMapOutputValueSerializationMetadata() {
+      return reduceContext.getMapOutputValueSerializationMetadata();
+    }
+
+    @Override
     public Class<? extends Partitioner<?, ?>> getPartitionerClass()
         throws ClassNotFoundException {
       return reduceContext.getPartitionerClass();

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java?rev=898019&r1=898018&r2=898019&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java Mon
Jan 11 19:22:42 2010
@@ -20,11 +20,17 @@
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration.IntegerRanges;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.serializer.DeserializerBase;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.SerializerBase;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -35,6 +41,7 @@
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.jobdata.ClassBasedJobData;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 
@@ -44,6 +51,8 @@
  */
 public class JobContextImpl implements JobContext {
 
+  private final static Log LOG = LogFactory.getLog(JobContextImpl.class.getName());
+
   protected final org.apache.hadoop.mapred.JobConf conf;
   private final JobID jobId;
   
@@ -102,6 +111,7 @@
     return conf.getOutputValueClass();
   }
 
+  @Deprecated
   /**
    * Get the key class for the map output data. If it is not set, use the
    * (final) output key class. This allows the map output key class to be
@@ -109,9 +119,12 @@
    * @return the map output key class.
    */
   public Class<?> getMapOutputKeyClass() {
-    return conf.getMapOutputKeyClass();
+    LOG.warn(
+        "Deprecated: Use ClassBasedJobData.getMapOutputKeyClass() instead");
+    return ClassBasedJobData.getMapOutputKeyClass(conf);
   }
 
+  @Deprecated
   /**
    * Get the value class for the map output data. If it is not set, use the
    * (final) output value class This allows the map output value class to be
@@ -120,7 +133,9 @@
    * @return the map output value class.
    */
   public Class<?> getMapOutputValueClass() {
-    return conf.getMapOutputValueClass();
+    LOG.warn("Deprecated: Use ClassBasedJobData.getMapOutputValueClass() "
+        + "instead");
+    return ClassBasedJobData.getMapOutputValueClass(conf);
   }
 
   /**
@@ -206,6 +221,42 @@
   }
 
   /**
+   * Get the serializer to encode keys from the mapper.
+   *
+   * @return the {@link SerializerBase} for the mapper output keys.
+   */
+  public <T> SerializerBase<T> getMapOutputKeySerializer() {
+    return conf.getMapOutputKeySerializer();
+  }
+
+  /**
+   * Get the deserializer to decode keys from the mapper.
+   *
+   * @return the {@link DeserializerBase} for the mapper output keys.
+   */
+  public <T> DeserializerBase<T> getMapOutputKeyDeserializer() {
+    return conf.getMapOutputKeyDeserializer();
+  }
+
+  /**
+   * Get the serializer to encode values from the mapper.
+   *
+   * @return the {@link SerializerBase} for the mapper output values.
+   */
+  public <T> SerializerBase<T> getMapOutputValueSerializer() {
+    return conf.getMapOutputValueSerializer();
+  }
+
+  /**
+   * Get the deserializer to decode values from the mapper.
+   *
+   * @return the {@link DeserializerBase} for the mapper output values.
+   */
+  public <T> DeserializerBase<T> getMapOutputValueDeserializer() {
+    return conf.getMapOutputValueDeserializer();
+  }
+
+  /**
    * Get the {@link RawComparator} comparator used to compare keys.
    * 
    * @return the {@link RawComparator} comparator used to compare keys.
@@ -383,5 +434,26 @@
   public String getUser() {
     return conf.getUser();
   }
-  
+
+  /**
+   * Get the metadata used by the serialization framework to instantiate
+   * (de)serializers for key data emitted by mappers.
+   *
+   * @return the metadata used by the serialization framework for the mapper
+   * output key.
+   */
+  public Map<String, String> getMapOutputKeySerializationMetadata() {
+    return conf.getMapOutputKeySerializationMetadata();
+  }
+
+  /**
+   * Get the metadata used by the serialization framework to instantiate
+   * (de)serializers for value data emitted by mappers.
+   *
+   * @return the metadata used by the serialization framework for the mapper
+   * output value.
+   */
+  public Map<String, String> getMapOutputValueSerializationMetadata() {
+    return conf.getMapOutputValueSerializationMetadata();
+  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/ReduceContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/ReduceContextImpl.java?rev=898019&r1=898018&r2=898019&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/ReduceContextImpl.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/ReduceContextImpl.java
Mon Jan 11 19:22:42 2010
@@ -21,6 +21,7 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.NoSuchElementException;
 
 import org.apache.hadoop.conf.Configuration;
@@ -70,8 +71,8 @@
   private boolean isMarked = false;
   private BackupStore<KEYIN,VALUEIN> backupStore;
   private final SerializationFactory serializationFactory;
-  private final Class<KEYIN> keyClass;
-  private final Class<VALUEIN> valueClass;
+  private final Map<String, String> keyMetadata;
+  private final Map<String, String> valueMetadata;
   private final Configuration conf;
   private final TaskAttemptID taskid;
   private int currentKeyLength = -1;
@@ -85,8 +86,8 @@
                            OutputCommitter committer,
                            StatusReporter reporter,
                            RawComparator<KEYIN> comparator,
-                           Class<KEYIN> keyClass,
-                           Class<VALUEIN> valueClass
+                           Map<String, String> keyMetadata,
+                           Map<String, String> valueMetadata
                           ) throws InterruptedException, IOException{
     super(conf, taskid, output, committer, reporter);
     this.input = input;
@@ -94,13 +95,13 @@
     this.inputValueCounter = inputValueCounter;
     this.comparator = comparator;
     this.serializationFactory = new SerializationFactory(conf);
-    this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
+    this.keyMetadata = keyMetadata;
+    this.valueMetadata = valueMetadata;
+    this.keyDeserializer = serializationFactory.getDeserializer(keyMetadata);
     this.keyDeserializer.open(buffer);
-    this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
+    this.valueDeserializer = serializationFactory.getDeserializer(valueMetadata);
     this.valueDeserializer.open(buffer);
     hasMore = input.next();
-    this.keyClass = keyClass;
-    this.valueClass = valueClass;
     this.conf = conf;
     this.taskid = taskid;
   }
@@ -326,12 +327,12 @@
       WritableUtils.writeVInt(out, currentKeyLength);
       WritableUtils.writeVInt(out, currentValueLength);
       Serializer<KEYIN> keySerializer = 
-        serializationFactory.getSerializer(keyClass);
+        serializationFactory.getSerializer(keyMetadata);
       keySerializer.open(out);
       keySerializer.serialize(getCurrentKey());
 
       Serializer<VALUEIN> valueSerializer = 
-        serializationFactory.getSerializer(valueClass);
+        serializationFactory.getSerializer(valueMetadata);
       valueSerializer.open(out);
       valueSerializer.serialize(getCurrentValue());
     }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java?rev=898019&r1=898018&r2=898019&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
Mon Jan 11 19:22:42 2010
@@ -22,6 +22,7 @@
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 
@@ -362,8 +363,6 @@
 
       RawKeyValueIterator rIter = 
         Merger.merge(jobConf, rfs,
-                     (Class<K>)jobConf.getMapOutputKeyClass(),
-                     (Class<V>)jobConf.getMapOutputValueClass(),
                      inMemorySegments, inMemorySegments.size(),
                      new Path(reduceId.toString()),
                      (RawComparator<K>)jobConf.getOutputKeyComparator(),
@@ -418,9 +417,7 @@
                                                Task.MERGED_OUTPUT_PREFIX);
 
       Writer<K,V> writer = 
-        new Writer<K,V>(jobConf, rfs, outputPath,
-                        (Class<K>) jobConf.getMapOutputKeyClass(),
-                        (Class<V>) jobConf.getMapOutputValueClass(),
+        new Writer<K,V>(jobConf, rfs, outputPath, true,
                         codec, null);
 
       RawKeyValueIterator rIter = null;
@@ -429,8 +426,6 @@
                  " segments...");
         
         rIter = Merger.merge(jobConf, rfs,
-                             (Class<K>)jobConf.getMapOutputKeyClass(),
-                             (Class<V>)jobConf.getMapOutputValueClass(),
                              inMemorySegments, inMemorySegments.size(),
                              new Path(reduceId.toString()),
                              (RawComparator<K>)jobConf.getOutputKeyComparator(),
@@ -499,16 +494,12 @@
         localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), 
             approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
       Writer<K,V> writer = 
-        new Writer<K,V>(jobConf, rfs, outputPath, 
-                        (Class<K>) jobConf.getMapOutputKeyClass(), 
-                        (Class<V>) jobConf.getMapOutputValueClass(),
+        new Writer<K,V>(jobConf, rfs, outputPath,  true,
                         codec, null);
       RawKeyValueIterator iter  = null;
       Path tmpDir = new Path(reduceId.toString());
       try {
         iter = Merger.merge(jobConf, rfs,
-                            (Class<K>) jobConf.getMapOutputKeyClass(),
-                            (Class<V>) jobConf.getMapOutputValueClass(),
                             codec, inputs.toArray(new Path[inputs.size()]), 
                             true, ioSortFactor, tmpDir, 
                             (RawComparator<K>) jobConf.getOutputKeyComparator(), 
@@ -538,13 +529,15 @@
       Counters.Counter inCounter) throws IOException {
     JobConf job = jobConf;
     Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
-    Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
-    Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
+    Map<String, String> keyMetadata =
+        job.getMapOutputKeySerializationMetadata();
+    Map<String, String> valueMetadata =
+        job.getMapOutputValueSerializationMetadata();
     RawComparator<K> comparator = 
       (RawComparator<K>)job.getOutputKeyComparator();
     try {
       CombineValuesIterator values = new CombineValuesIterator(
-          kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
+          kvIter, comparator, keyMetadata, valueMetadata, job, Reporter.NULL,
           inCounter);
       while (values.more()) {
         combiner.reduce(values.getKey(), values, combineCollector,
@@ -638,8 +631,6 @@
     
 
     // merge config params
-    Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
-    Class<V> valueClass = (Class<V>)job.getMapOutputValueClass();
     boolean keepInputs = job.getKeepFailedTaskFiles();
     final Path tmpDir = new Path(reduceId.toString());
     final RawComparator<K> comparator =
@@ -672,11 +663,11 @@
                                              inMemToDiskBytes).suffix(
                                                  Task.MERGED_OUTPUT_PREFIX);
         final RawKeyValueIterator rIter = Merger.merge(job, fs,
-            keyClass, valueClass, memDiskSegments, numMemDiskSegments,
+            memDiskSegments, numMemDiskSegments,
             tmpDir, comparator, reporter, spilledRecordsCounter, null, 
             mergePhase);
         final Writer<K,V> writer = new Writer<K,V>(job, fs, outputPath,
-            keyClass, valueClass, codec, null);
+            true, codec, null);
         try {
           Merger.writeFile(rIter, writer, reporter, job);
           // add to list of final disk outputs.
@@ -746,7 +737,7 @@
       // merges. See comment where mergePhaseFinished is being set
       Progress thisPhase = (mergePhaseFinished) ? null : mergePhase; 
       RawKeyValueIterator diskMerge = Merger.merge(
-          job, fs, keyClass, valueClass, diskSegments,
+          job, fs, diskSegments,
           ioSortFactor, numInMemSegments, tmpDir, comparator,
           reporter, false, spilledRecordsCounter, null, thisPhase);
       diskSegments.clear();
@@ -756,7 +747,7 @@
       finalSegments.add(new Segment<K,V>(
             new RawKVIteratorReader(diskMerge, onDiskBytes), true));
     }
-    return Merger.merge(job, fs, keyClass, valueClass,
+    return Merger.merge(job, fs,
                  finalSegments, finalSegments.size(), tmpDir,
                  comparator, reporter, spilledRecordsCounter, null,
                  null);

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java?rev=898019&r1=898018&r2=898019&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
Mon Jan 11 19:22:42 2010
@@ -25,6 +25,7 @@
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.jobdata.WritableJobData;
 
 import java.io.*;
 
@@ -66,8 +67,8 @@
 
     conf.setInputFormat(TextInputFormat.class);
 
-    conf.setMapOutputKeyClass(LongWritable.class);
-    conf.setMapOutputValueClass(Text.class);
+    WritableJobData.setMapOutputKeyClass(conf, LongWritable.class);
+    WritableJobData.setMapOutputValueClass(conf, Text.class);
 
     conf.setOutputFormat(TextOutputFormat.class);
     conf.setOutputKeyClass(LongWritable.class);
@@ -101,4 +102,4 @@
 
   }
 
-}
\ No newline at end of file
+}

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java?rev=898019&r1=898018&r2=898019&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java
Mon Jan 11 19:22:42 2010
@@ -104,7 +104,6 @@
 
     conf.setOutputKeyClass(String.class);
     conf.setOutputValueClass(Long.class);
-    conf.setOutputKeyComparatorClass(JavaSerializationComparator.class);
 
     conf.setMapperClass(WordCountMapper.class);
     conf.setReducerClass(SumReducer.class);

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java?rev=898019&r1=898018&r2=898019&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java Mon
Jan 11 19:22:42 2010
@@ -29,6 +29,7 @@
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapreduce.lib.jobdata.WritableJobData;
 import org.apache.hadoop.util.Progressable;
 
 /**
@@ -79,8 +80,11 @@
     FileSystem localFs = FileSystem.getLocal(conf);
     FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
     Path path = new Path(tmpDir, "data.in");
+    JobConf job = new JobConf(conf);
+    WritableJobData.setMapOutputKeyClass(job, Text.class);
+    WritableJobData.setMapOutputValueClass(job, Text.class);
     IFile.Writer<Text, Text> writer = 
-      new IFile.Writer<Text, Text>(conf, rfs, path, Text.class, Text.class,
+      new IFile.Writer<Text, Text>(job, rfs, path, true,
                                    codec, null);
     for(Pair p: vals) {
       writer.append(new Text(p.key), new Text(p.value));
@@ -89,14 +93,16 @@
     
     @SuppressWarnings("unchecked")
     RawKeyValueIterator rawItr = 
-      Merger.merge(conf, rfs, Text.class, Text.class, codec, new Path[]{path}, 
+      Merger.merge(job, rfs, codec, new Path[]{path}, 
                    false, conf.getInt(JobContext.IO_SORT_FACTOR, 100), tmpDir, 
                    new Text.Comparator(), new NullProgress(), null, null, null);
     @SuppressWarnings("unchecked") // WritableComparators are not generic
     ReduceTask.ValuesIterator valItr = 
       new ReduceTask.ValuesIterator<Text,Text>(rawItr,
-          WritableComparator.get(Text.class), Text.class, Text.class,
-          conf, new NullProgress());
+          WritableComparator.get(Text.class),
+          job.getMapOutputKeySerializationMetadata(),
+          job.getMapOutputValueSerializationMetadata(),
+          job, new NullProgress());
     int i = 0;
     while (valItr.more()) {
       Object key = valItr.getKey();

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestAvroSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestAvroSerialization.java?rev=898019&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestAvroSerialization.java
(added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestAvroSerialization.java
Mon Jan 11 19:22:42 2010
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import junit.framework.TestCase;
+
+import org.apache.avro.util.Utf8;
+import org.apache.avro.Schema;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Utils;
+import org.apache.hadoop.mapreduce.avro.WordCountKey;
+import org.apache.hadoop.mapreduce.avro.WordCountVal;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.jobdata.AvroGenericJobData;
+import org.apache.hadoop.mapreduce.lib.jobdata.AvroSpecificJobData;
+import org.apache.hadoop.mapreduce.lib.jobdata.AvroReflectJobData;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+
+/**
+ * Test the aspects of the MapReduce framework where serialization can be
+ * conducted via Avro instead of (e.g.) WritablESerialization.
+ */
+public class TestAvroSerialization extends TestCase {
+
+  private static String TEST_ROOT_DIR =
+    new File(System.getProperty("test.build.data", "/tmp")).toURI()
+    .toString().replace(' ', '+');
+
+  private final Path INPUT_DIR = new Path(TEST_ROOT_DIR + "/input");
+  private final Path OUTPUT_DIR = new Path(TEST_ROOT_DIR + "/out");
+  private final Path INPUT_FILE = new Path(INPUT_DIR , "inp");
+
+  // MapReduce classes using AvroGenericSerialization.
+
+  static class GenericWordCountMapper
+      extends Mapper<LongWritable, Text, Utf8, Long> {
+
+    public void map(LongWritable key, Text value,
+        Context context) throws IOException, InterruptedException {
+      StringTokenizer st = new StringTokenizer(value.toString());
+      while (st.hasMoreTokens()) {
+        context.write(new Utf8(st.nextToken()), 1L);
+      }
+    }
+  }
+
+  static class GenericSumReducer extends Reducer<Utf8, Long, Text, LongWritable> {
+
+    public void reduce(Utf8 key, Iterable<Long> values,
+        Context context) throws IOException, InterruptedException {
+
+      long sum = 0;
+      for (long val : values) {
+        sum += val;
+      }
+      context.write(new Text(key.toString()), new LongWritable(sum));
+    }
+  }
+
+  // MapReduce classes that use AvroSpecificSerialization.
+
+  static class SpecificWordCountMapper
+      extends Mapper<LongWritable, Text, WordCountKey, WordCountVal> {
+
+    public void map(LongWritable key, Text value,
+        Context context) throws IOException, InterruptedException {
+      StringTokenizer st = new StringTokenizer(value.toString());
+      WordCountKey outkey = new WordCountKey();
+      WordCountVal outval = new WordCountVal();
+      while (st.hasMoreTokens()) {
+        outkey.word = new Utf8(st.nextToken());
+        outval.subcount = 1;
+        context.write(outkey, outval);
+      }
+    }
+  }
+
+  static class SpecificSumReducer
+      extends Reducer<WordCountKey, WordCountVal, Text, LongWritable> {
+
+    public void reduce(WordCountKey key, Iterable<WordCountVal> values,
+        Context context) throws IOException, InterruptedException {
+
+      long sum = 0;
+      for (WordCountVal val : values) {
+        sum += val.subcount;
+      }
+      context.write(new Text(key.word.toString()), new LongWritable(sum));
+    }
+  }
+
+  // MapReduce classes that use AvroReflectSerialization.
+
+  static class ReflectableWordCountKey {
+    public Utf8 word = new Utf8("");
+  }
+
+  static class ReflectableWordCountVal {
+    public long subcount;
+  }
+
+  static class ReflectableWordCountMapper extends Mapper<LongWritable,
+      Text, ReflectableWordCountKey, ReflectableWordCountVal> {
+
+    public void map(LongWritable key, Text value,
+        Context context) throws IOException, InterruptedException {
+      StringTokenizer st = new StringTokenizer(value.toString());
+      ReflectableWordCountKey outkey = new ReflectableWordCountKey();
+      ReflectableWordCountVal outval = new ReflectableWordCountVal();
+      while (st.hasMoreTokens()) {
+        outkey.word = new Utf8(st.nextToken());
+        outval.subcount = 1;
+        context.write(outkey, outval);
+      }
+    }
+  }
+
+  static class ReflectableSumReducer
+      extends Reducer<ReflectableWordCountKey, ReflectableWordCountVal,
+      Text, LongWritable> {
+
+    public void reduce(ReflectableWordCountKey key,
+        Iterable<ReflectableWordCountVal> values,
+        Context context) throws IOException, InterruptedException {
+
+      long sum = 0;
+      for (ReflectableWordCountVal val : values) {
+        sum += val.subcount;
+      }
+      context.write(new Text(key.word.toString()), new LongWritable(sum));
+    }
+  }
+
+  private void cleanAndCreateInput(FileSystem fs) throws IOException {
+    fs.delete(INPUT_FILE, true);
+    fs.delete(OUTPUT_DIR, true);
+
+    OutputStream os = fs.create(INPUT_FILE);
+
+    Writer wr = new OutputStreamWriter(os);
+    wr.write("b a\n");
+    wr.close();
+  }
+
+  public void setUp() throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    cleanAndCreateInput(fs);
+  }
+
+  private Job createJob() throws IOException {
+    Configuration conf = new Configuration();
+    Job job = new Job(conf);
+    job.setJarByClass(TestAvroSerialization.class);
+    job.setInputFormatClass(TextInputFormat.class);
+
+    // Final output types are still Writable-based.
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(LongWritable.class);
+
+    FileInputFormat.setInputPaths(job, INPUT_DIR);
+    FileOutputFormat.setOutputPath(job, OUTPUT_DIR);
+
+    return job;
+  }
+
+  private void checkResults() throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    Path[] outputFiles = FileUtil.stat2Paths(
+        fs.listStatus(OUTPUT_DIR, 
+                      new Utils.OutputFileUtils.OutputFilesFilter()));
+    assertEquals(1, outputFiles.length);
+    InputStream is = fs.open(outputFiles[0]);
+    BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+    assertEquals("a\t1", reader.readLine());
+    assertEquals("b\t1", reader.readLine());
+    assertNull(reader.readLine());
+    reader.close();
+  }
+
+  // Test that we can use AvroGenericSerialization for intermediate data
+  public void testGenericIntermediateData() throws Exception {
+    Job job = createJob();
+    job.setJobName("AvroGenericSerialization");
+
+    job.setMapperClass(GenericWordCountMapper.class);
+    job.setReducerClass(GenericSumReducer.class);
+
+    // Set intermediate types based on Avro schemas.
+    Schema keySchema = Schema.create(Schema.Type.STRING);
+    Schema valSchema = Schema.create(Schema.Type.LONG);
+    AvroGenericJobData.setMapOutputKeySchema(job.getConfiguration(),
+        keySchema);
+    AvroGenericJobData.setMapOutputValueSchema(job.getConfiguration(),
+        valSchema);
+
+    job.waitForCompletion(false);
+
+    checkResults();
+  }
+
+  public void testSpecificIntermediateData() throws Exception {
+    Job job = createJob();
+    job.setJobName("AvroSpecificSerialization");
+
+    job.setMapperClass(SpecificWordCountMapper.class);
+    job.setReducerClass(SpecificSumReducer.class);
+
+    // Set intermediate types based on specific-records.
+    AvroSpecificJobData.setMapOutputKeyClass(job.getConfiguration(),
+        WordCountKey.class);
+    AvroSpecificJobData.setMapOutputValueClass(job.getConfiguration(),
+        WordCountVal.class);
+
+    job.waitForCompletion(false);
+
+    checkResults();
+  }
+
+  public void testReflectIntermediateData() throws Exception {
+    Job job = createJob();
+    job.setJobName("AvroReflectSerialization");
+
+    job.setMapperClass(ReflectableWordCountMapper.class);
+    job.setReducerClass(ReflectableSumReducer.class);
+
+    // Set intermediate types based on reflection records.
+    job.getConfiguration().set("avro.reflect.pkgs", "org.apache.hadoop.mapreduce");
+    AvroReflectJobData.setMapOutputKeyClass(job.getConfiguration(),
+        ReflectableWordCountKey.class);
+    AvroReflectJobData.setMapOutputValueClass(job.getConfiguration(),
+        ReflectableWordCountVal.class);
+
+    job.waitForCompletion(false);
+
+    checkResults();
+  }
+}

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/avro/key.avsc
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/avro/key.avsc?rev=898019&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/avro/key.avsc (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/avro/key.avsc Mon Jan
11 19:22:42 2010
@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{"type" : "record", "name":"WordCountKey", "namespace": "org.apache.hadoop.mapreduce.avro",
+  "fields" : [ { "name":"word", "type":"string" } ]
+}

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/avro/val.avsc
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/avro/val.avsc?rev=898019&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/avro/val.avsc (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/avro/val.avsc Mon Jan
11 19:22:42 2010
@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{"type" : "record", "name":"WordCountVal", "namespace": "org.apache.hadoop.mapreduce.avro",
+  "fields" : [ { "name":"subcount", "type":"long" } ]
+}

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java?rev=898019&r1=898018&r2=898019&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/HadoopArchives.java Mon Jan 11
19:22:42 2010
@@ -63,6 +63,7 @@
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.lib.jobdata.WritableJobData;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Tool;
@@ -441,8 +442,8 @@
     conf.setOutputFormat(NullOutputFormat.class);
     conf.setMapperClass(HArchivesMapper.class);
     conf.setReducerClass(HArchivesReducer.class);
-    conf.setMapOutputKeyClass(IntWritable.class);
-    conf.setMapOutputValueClass(Text.class);
+    WritableJobData.setMapOutputKeyClass(conf, IntWritable.class);
+    WritableJobData.setMapOutputValueClass(conf, Text.class);
     conf.set(JobContext.HISTORY_LOCATION, "none");
     FileInputFormat.addInputPath(conf, jobDirectory);
     //make sure no speculative execution is done



Mime
View raw message