hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r811134 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/lib/ src/java/org/apache/hadoop/mapreduce/ src/java/org/apache/hadoop/mapreduce/lib/chain/ src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/
Date Thu, 03 Sep 2009 21:31:21 GMT
Author: acmurthy
Date: Thu Sep  3 21:31:20 2009
New Revision: 811134

URL: http://svn.apache.org/viewvc?rev=811134&view=rev
Log:
Reverted MAPREDUCE-372.

Removed:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/Chain.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/ChainMapper.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/ChainReducer.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ReduceContext.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=811134&r1=811133&r2=811134&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Sep  3 21:31:20 2009
@@ -216,9 +216,6 @@
     MAPREDUCE-779. Added node health failure counts into 
     JobTrackerStatistics. (Sreekanth Ramakrishnan via yhemanth)
 
-    MAPREDUCE-372. Moves ChainMapper/Reducer to the new API.
-    (Amareshwari Sriramadasu via ddas)
-
     MAPREDUCE-789. Oracle support for Sqoop. (Aaron Kimball via tomwhite)
 
     MAPREDUCE-842. Setup secure permissions for localized job files,

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/Chain.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/Chain.java?rev=811134&r1=811133&r2=811134&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/Chain.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/Chain.java Thu Sep  3 21:31:20
2009
@@ -18,6 +18,8 @@
 package org.apache.hadoop.mapred.lib;
 
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Stringifier;
+import org.apache.hadoop.io.DefaultStringifier;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.Serialization;
 import org.apache.hadoop.io.serializer.SerializationFactory;
@@ -30,19 +32,45 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 
 /**
  * The Chain class provides all the common functionality for the
  * {@link ChainMapper} and the {@link ChainReducer} classes.
- * @deprecated Use {@link org.apache.hadoop.mapreduce.lib.chain.Chain} instead
  */
-@Deprecated
-class Chain extends org.apache.hadoop.mapreduce.lib.chain.Chain {
+class Chain {
+  private static final String CHAIN_MAPPER = "chain.mapper";
+  private static final String CHAIN_REDUCER = "chain.reducer";
+
+  private static final String CHAIN_MAPPER_SIZE = ".size";
+  private static final String CHAIN_MAPPER_CLASS = ".mapper.class.";
+  private static final String CHAIN_MAPPER_CONFIG = ".mapper.config.";
+  private static final String CHAIN_REDUCER_CLASS = ".reducer.class";
+  private static final String CHAIN_REDUCER_CONFIG = ".reducer.config";
 
   private static final String MAPPER_BY_VALUE = "chain.mapper.byValue";
   private static final String REDUCER_BY_VALUE = "chain.reducer.byValue";
 
+  private static final String MAPPER_INPUT_KEY_CLASS =
+    "chain.mapper.input.key.class";
+  private static final String MAPPER_INPUT_VALUE_CLASS =
+    "chain.mapper.input.value.class";
+  private static final String MAPPER_OUTPUT_KEY_CLASS =
+    "chain.mapper.output.key.class";
+  private static final String MAPPER_OUTPUT_VALUE_CLASS =
+    "chain.mapper.output.value.class";
+  private static final String REDUCER_INPUT_KEY_CLASS =
+    "chain.reducer.input.key.class";
+  private static final String REDUCER_INPUT_VALUE_CLASS =
+    "chain.reducer.input.value.class";
+  private static final String REDUCER_OUTPUT_KEY_CLASS =
+    "chain.reducer.output.key.class";
+  private static final String REDUCER_OUTPUT_VALUE_CLASS =
+    "chain.reducer.output.value.class";
+
+  private boolean isMap;
+
   private JobConf chainJobConf;
 
   private List<Mapper> mappers = new ArrayList<Mapper>();
@@ -64,7 +92,51 @@
    *              Reducer.
    */
   Chain(boolean isMap) {
-    super(isMap);
+    this.isMap = isMap;
+  }
+
+  /**
+   * Returns the prefix to use for the configuration of the chain depending
+   * if it is for a Mapper or a Reducer.
+   *
+   * @param isMap TRUE for Mapper, FALSE for Reducer.
+   * @return the prefix to use.
+   */
+  private static String getPrefix(boolean isMap) {
+    return (isMap) ? CHAIN_MAPPER : CHAIN_REDUCER;
+  }
+
+  /**
+   * Creates a {@link JobConf} for one of the Maps or Reduce in the chain.
+   * <p/>
+   * It creates a new JobConf using the chain job's JobConf as base and adds to
+   * it the configuration properties for the chain element. The keys of the
+   * chain element jobConf have precedence over the given JobConf.
+   *
+   * @param jobConf the chain job's JobConf.
+   * @param confKey the key for chain element configuration serialized in the
+   *                chain job's JobConf.
+   * @return a new JobConf aggregating the chain job's JobConf with the chain
+   *         element configuration properties.
+   */
+  private static JobConf getChainElementConf(JobConf jobConf, String confKey) {
+    JobConf conf;
+    try {
+      Stringifier<JobConf> stringifier =
+        new DefaultStringifier<JobConf>(jobConf, JobConf.class);
+      conf = stringifier.fromString(jobConf.get(confKey, null));
+    } catch (IOException ioex) {
+      throw new RuntimeException(ioex);
+    }
+    // we have to do this because the Writable desearialization clears all
+    // values set in the conf making not possible do do a new JobConf(jobConf)
+    // in the creation of the conf above
+    jobConf = new JobConf(jobConf);
+
+    for(Map.Entry<String, String> entry : conf) {
+      jobConf.set(entry.getKey(), entry.getValue());
+    }
+    return jobConf;
   }
 
   /**
@@ -97,27 +169,82 @@
     String prefix = getPrefix(isMap);
 
     // if a reducer chain check the Reducer has been already set
-    checkReducerAlreadySet(isMap, jobConf, prefix, true);
-	    
-    // set the mapper class
-    int index = getIndex(jobConf, prefix);
+    if (!isMap) {
+      if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS,
+                           Reducer.class) == null) {
+        throw new IllegalStateException(
+          "A Mapper can be added to the chain only after the Reducer has " +
+          "been set");
+      }
+    }
+    int index = jobConf.getInt(prefix + CHAIN_MAPPER_SIZE, 0);
     jobConf.setClass(prefix + CHAIN_MAPPER_CLASS + index, klass, Mapper.class);
-	    
-    validateKeyValueTypes(isMap, jobConf, inputKeyClass, inputValueClass,
-      outputKeyClass, outputValueClass, index, prefix);
-	    
+
+    // if it is a reducer chain and the first Mapper is being added check the
+    // key and value input classes of the mapper match those of the reducer
+    // output.
+    if (!isMap && index == 0) {
+      JobConf reducerConf =
+        getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG);
+      if (! inputKeyClass.isAssignableFrom(
+        reducerConf.getClass(REDUCER_OUTPUT_KEY_CLASS, null))) {
+        throw new IllegalArgumentException("The Reducer output key class does" +
+          " not match the Mapper input key class");
+      }
+      if (! inputValueClass.isAssignableFrom(
+        reducerConf.getClass(REDUCER_OUTPUT_VALUE_CLASS, null))) {
+        throw new IllegalArgumentException("The Reducer output value class" +
+          " does not match the Mapper input value class");
+      }
+    } else if (index > 0) {
+      // check the that the new Mapper in the chain key and value input classes
+      // match those of the previous Mapper output.
+      JobConf previousMapperConf =
+        getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG +
+          (index - 1));
+      if (! inputKeyClass.isAssignableFrom(
+        previousMapperConf.getClass(MAPPER_OUTPUT_KEY_CLASS, null))) {
+        throw new IllegalArgumentException("The Mapper output key class does" +
+          " not match the previous Mapper input key class");
+      }
+      if (! inputValueClass.isAssignableFrom(
+        previousMapperConf.getClass(MAPPER_OUTPUT_VALUE_CLASS, null))) {
+        throw new IllegalArgumentException("The Mapper output value class" +
+          " does not match the previous Mapper input value class");
+      }
+    }
+
     // if the Mapper does not have a private JobConf create an empty one
     if (mapperConf == null) {
-    // using a JobConf without defaults to make it lightweight.
-    // still the chain JobConf may have all defaults and this conf is
-    // overlapped to the chain JobConf one.
+      // using a JobConf without defaults to make it lightweight.
+      // still the chain JobConf may have all defaults and this conf is
+      // overlapped to the chain JobConf one.
       mapperConf = new JobConf(true);
     }
-    // store in the private mapper conf if it works by value or by reference
+
+    // store in the private mapper conf the input/output classes of the mapper
+    // and if it works by value or by reference
     mapperConf.setBoolean(MAPPER_BY_VALUE, byValue);
-    
-    setMapperConf(isMap, jobConf, inputKeyClass, inputValueClass,
-	      outputKeyClass, outputValueClass, mapperConf, index, prefix);
+    mapperConf.setClass(MAPPER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
+    mapperConf.setClass(MAPPER_INPUT_VALUE_CLASS, inputValueClass,
+                        Object.class);
+    mapperConf.setClass(MAPPER_OUTPUT_KEY_CLASS, outputKeyClass, Object.class);
+    mapperConf.setClass(MAPPER_OUTPUT_VALUE_CLASS, outputValueClass,
+                        Object.class);
+
+    // serialize the private mapper jobconf in the chain jobconf.
+    Stringifier<JobConf> stringifier =
+      new DefaultStringifier<JobConf>(jobConf, JobConf.class);
+    try {
+      jobConf.set(prefix + CHAIN_MAPPER_CONFIG + index,
+                  stringifier.toString(new JobConf(mapperConf)));
+    }
+    catch (IOException ioEx) {
+      throw new RuntimeException(ioEx);
+    }
+
+    // increment the chain counter
+    jobConf.setInt(prefix + CHAIN_MAPPER_SIZE, index + 1);
   }
 
   /**
@@ -146,10 +273,13 @@
                           Class<? extends V2> outputValueClass,
                           boolean byValue, JobConf reducerConf) {
     String prefix = getPrefix(false);
-    checkReducerAlreadySet(false, jobConf, prefix, false);
+
+    if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null) != null) {
+      throw new IllegalStateException("Reducer has been already set");
+    }
 
     jobConf.setClass(prefix + CHAIN_REDUCER_CLASS, klass, Reducer.class);
-    
+
     // if the Reducer does not have a private JobConf create an empty one
     if (reducerConf == null) {
       // using a JobConf without defaults to make it lightweight.
@@ -161,9 +291,24 @@
     // store in the private reducer conf the input/output classes of the reducer
     // and if it works by value or by reference
     reducerConf.setBoolean(MAPPER_BY_VALUE, byValue);
-
-    setReducerConf(jobConf, inputKeyClass, inputValueClass, outputKeyClass,
-      outputValueClass, reducerConf, prefix);
+    reducerConf.setClass(REDUCER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
+    reducerConf.setClass(REDUCER_INPUT_VALUE_CLASS, inputValueClass,
+                         Object.class);
+    reducerConf.setClass(REDUCER_OUTPUT_KEY_CLASS, outputKeyClass,
+                         Object.class);
+    reducerConf.setClass(REDUCER_OUTPUT_VALUE_CLASS, outputValueClass,
+                         Object.class);
+
+    // serialize the private mapper jobconf in the chain jobconf.
+    Stringifier<JobConf> stringifier =
+      new DefaultStringifier<JobConf>(jobConf, JobConf.class);
+    try {
+      jobConf.set(prefix + CHAIN_REDUCER_CONFIG,
+                  stringifier.toString(new JobConf(reducerConf)));
+    }
+    catch (IOException ioEx) {
+      throw new RuntimeException(ioEx);
+    }
   }
 
   /**
@@ -180,8 +325,8 @@
     for (int i = 0; i < index; i++) {
       Class<? extends Mapper> klass =
         jobConf.getClass(prefix + CHAIN_MAPPER_CLASS + i, null, Mapper.class);
-      JobConf mConf = new JobConf(
-        getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG + i));
+      JobConf mConf =
+        getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG + i);
       Mapper mapper = ReflectionUtils.newInstance(klass, mConf);
       mappers.add(mapper);
 
@@ -198,8 +343,8 @@
     Class<? extends Reducer> klass =
       jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null, Reducer.class);
     if (klass != null) {
-      JobConf rConf = new JobConf(
-        getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG));
+      JobConf rConf =
+        getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG);
       reducer = ReflectionUtils.newInstance(klass, rConf);
       if (rConf.getBoolean(REDUCER_BY_VALUE, true)) {
         reducerKeySerialization = serializationFactory

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/ChainMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/ChainMapper.java?rev=811134&r1=811133&r2=811134&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/ChainMapper.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/ChainMapper.java Thu Sep
 3 21:31:20 2009
@@ -86,10 +86,7 @@
  * RunningJob job = jc.submitJob(conf);
  * ...
  * </pre>
- * @deprecated 
- * Use {@link org.apache.hadoop.mapreduce.lib.chain.ChainMapper} instead
  */
-@Deprecated
 public class ChainMapper implements Mapper {
 
   /**

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/ChainReducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/ChainReducer.java?rev=811134&r1=811133&r2=811134&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/ChainReducer.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/ChainReducer.java Thu Sep
 3 21:31:20 2009
@@ -86,10 +86,7 @@
  * RunningJob job = jc.submitJob(conf);
  * ...
  * </pre>
- * @deprecated 
- * Use {@link org.apache.hadoop.mapreduce.lib.chain.ChainReducer} instead
  */
-@Deprecated
 public class ChainReducer implements Reducer {
 
   /**

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ReduceContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ReduceContext.java?rev=811134&r1=811133&r2=811134&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ReduceContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ReduceContext.java Thu Sep
 3 21:31:20 2009
@@ -94,22 +94,6 @@
     this.taskid = taskid;
   }
 
-  public RawKeyValueIterator getIterator() {
-    return input;
-  }
-  
-  public Counter getInputCounter() {
-    return inputCounter;
-  }
-  
-  public RawComparator<KEYIN> getComparator() {
-    return comparator;
-  }
-  
-  public boolean hasMore() {
-    return hasMore;
-  }
-  
   /** Start processing next unique key. */
   public boolean nextKey() throws IOException,InterruptedException {
     while (hasMore && nextKeyIsSame) {



Mime
View raw message