hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r803019 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/lib/ src/java/org/apache/hadoop/mapreduce/ src/java/org/apache/hadoop/mapreduce/lib/chain/ src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/
Date Tue, 11 Aug 2009 07:56:50 GMT
Author: ddas
Date: Tue Aug 11 07:56:49 2009
New Revision: 803019

URL: http://svn.apache.org/viewvc?rev=803019&view=rev
Log:
MAPREDUCE-372. Moves ChainMapper/Reducer to the new API. Contributed by Amareshwari Sriramadasu.

Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/Chain.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapper.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReducer.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestChainErrors.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestChainWordCount.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestMapReduceChain.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/Chain.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/ChainMapper.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/ChainReducer.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ReduceContext.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=803019&r1=803018&r2=803019&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Aug 11 07:56:49 2009
@@ -181,6 +181,9 @@
     MAPREDUCE-779. Added node health failure counts into 
     JobTrackerStatistics. (Sreekanth Ramakrishnan via yhemanth)
 
+    MAPREDUCE-372. Moves ChainMapper/Reducer to the new API.
+    (Amareshwari Sriramadasu via ddas)
+
   BUG FIXES
     MAPREDUCE-703. Sqoop requires dependency on hsqldb in ivy.
     (Aaron Kimball via matei)

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/Chain.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/Chain.java?rev=803019&r1=803018&r2=803019&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/Chain.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/Chain.java Tue Aug 11 07:56:49 2009
@@ -18,8 +18,6 @@
 package org.apache.hadoop.mapred.lib;
 
 import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.Stringifier;
-import org.apache.hadoop.io.DefaultStringifier;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.Serialization;
 import org.apache.hadoop.io.serializer.SerializationFactory;
@@ -32,45 +30,19 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
 
 /**
  * The Chain class provides all the common functionality for the
  * {@link ChainMapper} and the {@link ChainReducer} classes.
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.lib.chain.Chain} instead
  */
-class Chain {
-  private static final String CHAIN_MAPPER = "chain.mapper";
-  private static final String CHAIN_REDUCER = "chain.reducer";
-
-  private static final String CHAIN_MAPPER_SIZE = ".size";
-  private static final String CHAIN_MAPPER_CLASS = ".mapper.class.";
-  private static final String CHAIN_MAPPER_CONFIG = ".mapper.config.";
-  private static final String CHAIN_REDUCER_CLASS = ".reducer.class";
-  private static final String CHAIN_REDUCER_CONFIG = ".reducer.config";
+@Deprecated
+class Chain extends org.apache.hadoop.mapreduce.lib.chain.Chain {
 
   private static final String MAPPER_BY_VALUE = "chain.mapper.byValue";
   private static final String REDUCER_BY_VALUE = "chain.reducer.byValue";
 
-  private static final String MAPPER_INPUT_KEY_CLASS =
-    "chain.mapper.input.key.class";
-  private static final String MAPPER_INPUT_VALUE_CLASS =
-    "chain.mapper.input.value.class";
-  private static final String MAPPER_OUTPUT_KEY_CLASS =
-    "chain.mapper.output.key.class";
-  private static final String MAPPER_OUTPUT_VALUE_CLASS =
-    "chain.mapper.output.value.class";
-  private static final String REDUCER_INPUT_KEY_CLASS =
-    "chain.reducer.input.key.class";
-  private static final String REDUCER_INPUT_VALUE_CLASS =
-    "chain.reducer.input.value.class";
-  private static final String REDUCER_OUTPUT_KEY_CLASS =
-    "chain.reducer.output.key.class";
-  private static final String REDUCER_OUTPUT_VALUE_CLASS =
-    "chain.reducer.output.value.class";
-
-  private boolean isMap;
-
   private JobConf chainJobConf;
 
   private List<Mapper> mappers = new ArrayList<Mapper>();
@@ -92,51 +64,7 @@
    *              Reducer.
    */
   Chain(boolean isMap) {
-    this.isMap = isMap;
-  }
-
-  /**
-   * Returns the prefix to use for the configuration of the chain depending
-   * if it is for a Mapper or a Reducer.
-   *
-   * @param isMap TRUE for Mapper, FALSE for Reducer.
-   * @return the prefix to use.
-   */
-  private static String getPrefix(boolean isMap) {
-    return (isMap) ? CHAIN_MAPPER : CHAIN_REDUCER;
-  }
-
-  /**
-   * Creates a {@link JobConf} for one of the Maps or Reduce in the chain.
-   * <p/>
-   * It creates a new JobConf using the chain job's JobConf as base and adds to
-   * it the configuration properties for the chain element. The keys of the
-   * chain element jobConf have precedence over the given JobConf.
-   *
-   * @param jobConf the chain job's JobConf.
-   * @param confKey the key for chain element configuration serialized in the
-   *                chain job's JobConf.
-   * @return a new JobConf aggregating the chain job's JobConf with the chain
-   *         element configuration properties.
-   */
-  private static JobConf getChainElementConf(JobConf jobConf, String confKey) {
-    JobConf conf;
-    try {
-      Stringifier<JobConf> stringifier =
-        new DefaultStringifier<JobConf>(jobConf, JobConf.class);
-      conf = stringifier.fromString(jobConf.get(confKey, null));
-    } catch (IOException ioex) {
-      throw new RuntimeException(ioex);
-    }
-    // we have to do this because the Writable desearialization clears all
-    // values set in the conf making not possible do do a new JobConf(jobConf)
-    // in the creation of the conf above
-    jobConf = new JobConf(jobConf);
-
-    for(Map.Entry<String, String> entry : conf) {
-      jobConf.set(entry.getKey(), entry.getValue());
-    }
-    return jobConf;
+    super(isMap);
   }
 
   /**
@@ -169,82 +97,27 @@
     String prefix = getPrefix(isMap);
 
     // if a reducer chain check the Reducer has been already set
-    if (!isMap) {
-      if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS,
-                           Reducer.class) == null) {
-        throw new IllegalStateException(
-          "A Mapper can be added to the chain only after the Reducer has " +
-          "been set");
-      }
-    }
-    int index = jobConf.getInt(prefix + CHAIN_MAPPER_SIZE, 0);
+    checkReducerAlreadySet(isMap, jobConf, prefix, true);
+	    
+    // set the mapper class
+    int index = getIndex(jobConf, prefix);
     jobConf.setClass(prefix + CHAIN_MAPPER_CLASS + index, klass, Mapper.class);
-
-    // if it is a reducer chain and the first Mapper is being added check the
-    // key and value input classes of the mapper match those of the reducer
-    // output.
-    if (!isMap && index == 0) {
-      JobConf reducerConf =
-        getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG);
-      if (! inputKeyClass.isAssignableFrom(
-        reducerConf.getClass(REDUCER_OUTPUT_KEY_CLASS, null))) {
-        throw new IllegalArgumentException("The Reducer output key class does" +
-          " not match the Mapper input key class");
-      }
-      if (! inputValueClass.isAssignableFrom(
-        reducerConf.getClass(REDUCER_OUTPUT_VALUE_CLASS, null))) {
-        throw new IllegalArgumentException("The Reducer output value class" +
-          " does not match the Mapper input value class");
-      }
-    } else if (index > 0) {
-      // check the that the new Mapper in the chain key and value input classes
-      // match those of the previous Mapper output.
-      JobConf previousMapperConf =
-        getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG +
-          (index - 1));
-      if (! inputKeyClass.isAssignableFrom(
-        previousMapperConf.getClass(MAPPER_OUTPUT_KEY_CLASS, null))) {
-        throw new IllegalArgumentException("The Mapper output key class does" +
-          " not match the previous Mapper input key class");
-      }
-      if (! inputValueClass.isAssignableFrom(
-        previousMapperConf.getClass(MAPPER_OUTPUT_VALUE_CLASS, null))) {
-        throw new IllegalArgumentException("The Mapper output value class" +
-          " does not match the previous Mapper input value class");
-      }
-    }
-
+	    
+    validateKeyValueTypes(isMap, jobConf, inputKeyClass, inputValueClass,
+      outputKeyClass, outputValueClass, index, prefix);
+	    
     // if the Mapper does not have a private JobConf create an empty one
     if (mapperConf == null) {
-      // using a JobConf without defaults to make it lightweight.
-      // still the chain JobConf may have all defaults and this conf is
-      // overlapped to the chain JobConf one.
+    // using a JobConf without defaults to make it lightweight.
+    // still the chain JobConf may have all defaults and this conf is
+    // overlapped to the chain JobConf one.
       mapperConf = new JobConf(true);
     }
-
-    // store in the private mapper conf the input/output classes of the mapper
-    // and if it works by value or by reference
+    // store in the private mapper conf if it works by value or by reference
     mapperConf.setBoolean(MAPPER_BY_VALUE, byValue);
-    mapperConf.setClass(MAPPER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
-    mapperConf.setClass(MAPPER_INPUT_VALUE_CLASS, inputValueClass,
-                        Object.class);
-    mapperConf.setClass(MAPPER_OUTPUT_KEY_CLASS, outputKeyClass, Object.class);
-    mapperConf.setClass(MAPPER_OUTPUT_VALUE_CLASS, outputValueClass,
-                        Object.class);
-
-    // serialize the private mapper jobconf in the chain jobconf.
-    Stringifier<JobConf> stringifier =
-      new DefaultStringifier<JobConf>(jobConf, JobConf.class);
-    try {
-      jobConf.set(prefix + CHAIN_MAPPER_CONFIG + index,
-                  stringifier.toString(new JobConf(mapperConf)));
-    }
-    catch (IOException ioEx) {
-      throw new RuntimeException(ioEx);
-    }
-
-    // increment the chain counter
-    jobConf.setInt(prefix + CHAIN_MAPPER_SIZE, index + 1);
+    
+    setMapperConf(isMap, jobConf, inputKeyClass, inputValueClass,
+	      outputKeyClass, outputValueClass, mapperConf, index, prefix);
   }
 
   /**
@@ -273,13 +146,10 @@
                           Class<? extends V2> outputValueClass,
                           boolean byValue, JobConf reducerConf) {
     String prefix = getPrefix(false);
-
-    if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null) != null) {
-      throw new IllegalStateException("Reducer has been already set");
-    }
+    checkReducerAlreadySet(false, jobConf, prefix, false);
 
     jobConf.setClass(prefix + CHAIN_REDUCER_CLASS, klass, Reducer.class);
-
+    
     // if the Reducer does not have a private JobConf create an empty one
     if (reducerConf == null) {
       // using a JobConf without defaults to make it lightweight.
@@ -291,24 +161,9 @@
     // store in the private reducer conf the input/output classes of the reducer
     // and if it works by value or by reference
     reducerConf.setBoolean(MAPPER_BY_VALUE, byValue);
-    reducerConf.setClass(REDUCER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
-    reducerConf.setClass(REDUCER_INPUT_VALUE_CLASS, inputValueClass,
-                         Object.class);
-    reducerConf.setClass(REDUCER_OUTPUT_KEY_CLASS, outputKeyClass,
-                         Object.class);
-    reducerConf.setClass(REDUCER_OUTPUT_VALUE_CLASS, outputValueClass,
-                         Object.class);
-
-    // serialize the private mapper jobconf in the chain jobconf.
-    Stringifier<JobConf> stringifier =
-      new DefaultStringifier<JobConf>(jobConf, JobConf.class);
-    try {
-      jobConf.set(prefix + CHAIN_REDUCER_CONFIG,
-                  stringifier.toString(new JobConf(reducerConf)));
-    }
-    catch (IOException ioEx) {
-      throw new RuntimeException(ioEx);
-    }
+
+    setReducerConf(jobConf, inputKeyClass, inputValueClass, outputKeyClass,
+      outputValueClass, reducerConf, prefix);
   }
 
   /**
@@ -325,8 +180,8 @@
     for (int i = 0; i < index; i++) {
       Class<? extends Mapper> klass =
         jobConf.getClass(prefix + CHAIN_MAPPER_CLASS + i, null, Mapper.class);
-      JobConf mConf =
-        getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG + i);
+      JobConf mConf = new JobConf(
+        getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG + i));
       Mapper mapper = ReflectionUtils.newInstance(klass, mConf);
       mappers.add(mapper);
 
@@ -343,8 +198,8 @@
     Class<? extends Reducer> klass =
       jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null, Reducer.class);
     if (klass != null) {
-      JobConf rConf =
-        getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG);
+      JobConf rConf = new JobConf(
+        getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG));
       reducer = ReflectionUtils.newInstance(klass, rConf);
       if (rConf.getBoolean(REDUCER_BY_VALUE, true)) {
         reducerKeySerialization = serializationFactory

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/ChainMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/ChainMapper.java?rev=803019&r1=803018&r2=803019&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/ChainMapper.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/ChainMapper.java Tue Aug 11 07:56:49 2009
@@ -86,7 +86,10 @@
  * RunningJob job = jc.submitJob(conf);
  * ...
  * </pre>
+ * @deprecated 
+ * Use {@link org.apache.hadoop.mapreduce.lib.chain.ChainMapper} instead
  */
+@Deprecated
 public class ChainMapper implements Mapper {
 
   /**

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/ChainReducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/ChainReducer.java?rev=803019&r1=803018&r2=803019&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/ChainReducer.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/ChainReducer.java Tue Aug 11 07:56:49 2009
@@ -86,7 +86,10 @@
  * RunningJob job = jc.submitJob(conf);
  * ...
  * </pre>
+ * @deprecated 
+ * Use {@link org.apache.hadoop.mapreduce.lib.chain.ChainReducer} instead
  */
+@Deprecated
 public class ChainReducer implements Reducer {
 
   /**

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ReduceContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ReduceContext.java?rev=803019&r1=803018&r2=803019&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ReduceContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ReduceContext.java Tue Aug 11 07:56:49 2009
@@ -94,6 +94,22 @@
     this.taskid = taskid;
   }
 
+  public RawKeyValueIterator getIterator() {
+    return input;
+  }
+  
+  public Counter getInputCounter() {
+    return inputCounter;
+  }
+  
+  public RawComparator<KEYIN> getComparator() {
+    return comparator;
+  }
+  
+  public boolean hasMore() {
+    return hasMore;
+  }
+  
   /** Start processing next unique key. */
   public boolean nextKey() throws IOException,InterruptedException {
     while (hasMore && nextKeyIsSame) {

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/Chain.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/Chain.java?rev=803019&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/Chain.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/Chain.java Tue Aug 11 07:56:49 2009
@@ -0,0 +1,954 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.chain;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Stringifier;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.ReduceContext;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * The Chain class provides all the common functionality for the
+ * {@link ChainMapper} and the {@link ChainReducer} classes.
+ */
+public class Chain {
+  protected static final String CHAIN_MAPPER = "chain.mapper";
+  protected static final String CHAIN_REDUCER = "chain.reducer";
+
+  protected static final String CHAIN_MAPPER_SIZE = ".size";
+  protected static final String CHAIN_MAPPER_CLASS = ".mapper.class.";
+  protected static final String CHAIN_MAPPER_CONFIG = ".mapper.config.";
+  protected static final String CHAIN_REDUCER_CLASS = ".reducer.class";
+  protected static final String CHAIN_REDUCER_CONFIG = ".reducer.config";
+
+
+  protected static final String MAPPER_INPUT_KEY_CLASS =
+    "chain.mapper.input.key.class";
+  protected static final String MAPPER_INPUT_VALUE_CLASS =
+    "chain.mapper.input.value.class";
+  protected static final String MAPPER_OUTPUT_KEY_CLASS =
+    "chain.mapper.output.key.class";
+  protected static final String MAPPER_OUTPUT_VALUE_CLASS =
+    "chain.mapper.output.value.class";
+  protected static final String REDUCER_INPUT_KEY_CLASS =
+    "chain.reducer.input.key.class";
+  protected static final String REDUCER_INPUT_VALUE_CLASS =
+    "chain.reducer.input.value.class";
+  protected static final String REDUCER_OUTPUT_KEY_CLASS =
+    "chain.reducer.output.key.class";
+  protected static final String REDUCER_OUTPUT_VALUE_CLASS =
+    "chain.reducer.output.value.class";
+
+  protected boolean isMap;
+
+  @SuppressWarnings("unchecked")
+  private  List<Mapper> mappers = new ArrayList<Mapper>();
+  private Reducer<?, ?, ?, ?> reducer;
+  private List<Configuration> confList = new ArrayList<Configuration>();
+  private Configuration rConf;
+  private List<Thread> threads = new ArrayList<Thread>();
+  private Throwable throwable = null;
+
+  /**
+   * Creates a Chain instance configured for a Mapper or a Reducer.
+   *
+   * @param isMap TRUE indicates the chain is for a Mapper, 
+   *              FALSE that is for a Reducer.
+   */
+  protected Chain(boolean isMap) {
+    this.isMap = isMap;
+  }
+
+  static class KeyValuePair<K,V> {
+    K key;
+    V value;
+    boolean endOfInput;
+    
+    KeyValuePair(K key, V value) {
+      this.key = key;
+      this.value = value;
+      this.endOfInput = false;
+    }
+    
+    KeyValuePair(boolean eof) {
+      this.key = null;
+      this.value = null;
+      this.endOfInput = eof;
+    }
+  }
+  
+  // ChainRecordReader either reads from blocking queue or task context.
+  private static class ChainRecordReader<KEYIN, VALUEIN> 
+      extends RecordReader<KEYIN, VALUEIN> {
+    private Class<?> keyClass;
+    private Class<?> valueClass;
+    private KEYIN key;
+    private VALUEIN value;
+    private Configuration conf;
+    TaskInputOutputContext<KEYIN, VALUEIN, ?, ?> inputContext = null;
+    BlockingQueue<KeyValuePair<KEYIN, VALUEIN>> inputQueue = null;
+    
+    // constructor to read from blocking queue
+    ChainRecordReader(Class<?> keyClass, Class<?> valueClass, 
+        BlockingQueue<KeyValuePair<KEYIN, VALUEIN>> inputQueue,
+        Configuration conf) {
+      this.keyClass = keyClass;
+      this.valueClass = valueClass;
+      this.inputQueue = inputQueue;
+      this.conf = conf;
+    }
+
+    // constructor to read from context
+    ChainRecordReader(TaskInputOutputContext<KEYIN, VALUEIN, ?, ?> context) {
+      inputContext = context;
+    }
+
+    public void initialize(InputSplit split, TaskAttemptContext context) 
+        throws IOException, InterruptedException {
+    }
+
+    /**
+     * Advance to the next key, value pair, returning null if at end.
+     * @return the key object that was read into, or null if no more
+     */
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      if (inputQueue != null) {
+        return readFromQueue();
+      } else if (inputContext.nextKeyValue()) {
+        this.key = inputContext.getCurrentKey();
+        this.value = inputContext.getCurrentValue();
+        return true;
+      } else {
+        return false; 
+      }
+    }
+    
+    @SuppressWarnings("unchecked")
+    private boolean readFromQueue() throws IOException, InterruptedException {
+      // if thread was interrupted, exit by throwing InterruptedException 
+      if (Thread.interrupted()) {
+        throw new InterruptedException();
+      }
+      // wait for input on queue
+      KeyValuePair<KEYIN, VALUEIN> kv = inputQueue.take();
+      if (kv.endOfInput) {
+        return false;
+      }
+      key = (KEYIN) ReflectionUtils.newInstance(keyClass, conf);
+      value = (VALUEIN) ReflectionUtils.newInstance(valueClass, conf);
+      ReflectionUtils.copy(conf, kv.key, this.key);
+      ReflectionUtils.copy(conf, kv.value, this.value);
+      return true;
+    }
+
+    /**
+     * Get the current key.
+     * @return the current key object or null if there isn't one
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public KEYIN getCurrentKey() throws IOException, InterruptedException {
+      return this.key;
+    }
+
+    /**
+     * Get the current value.
+     * @return the value object that was read into
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public VALUEIN getCurrentValue() throws IOException, InterruptedException {
+      return this.value;
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      return 0;
+    }
+  }
+  
+  // ChainRecordWriter either writes to blocking queue or task contexy
+  
+  private static class ChainRecordWriter<KEYOUT, VALUEOUT> 
+      extends RecordWriter<KEYOUT, VALUEOUT> {
+    TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> outputContext = null;
+    BlockingQueue<KeyValuePair<KEYOUT, VALUEOUT>> outputQueue = null;
+    KEYOUT keyout;
+    VALUEOUT valueout;
+    Configuration conf;
+    Class<?> keyClass;
+    Class<?> valueClass;
+    
+    // constructor to write to context
+    ChainRecordWriter(TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) {
+      outputContext = context;
+    }
+
+    // constructor to write to blocking queue
+    ChainRecordWriter(Class<?> keyClass, Class<?> valueClass, 
+        BlockingQueue<KeyValuePair<KEYOUT, VALUEOUT>> output,
+        Configuration conf) {
+      this.keyClass = keyClass;
+      this.valueClass = valueClass;
+      this.outputQueue = output;
+      this.conf = conf;
+    }
+
+    /** 
+     * Writes a key/value pair.
+     *
+     * @param key the key to write.
+     * @param value the value to write.
+     * @throws IOException
+     */      
+    public void write(KEYOUT key, VALUEOUT value) 
+        throws IOException, InterruptedException {
+      if (outputQueue != null) {
+        writeToQueue(key, value);
+      } else {
+        outputContext.write(key,value);
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void writeToQueue(KEYOUT key, VALUEOUT value)
+        throws IOException, InterruptedException {
+      this.keyout = (KEYOUT) ReflectionUtils.newInstance(keyClass, conf);
+      this.valueout = (VALUEOUT) ReflectionUtils.newInstance(valueClass, conf);
+      ReflectionUtils.copy(conf, key, this.keyout);
+      ReflectionUtils.copy(conf, value, this.valueout);
+      // if thread was interrupted, exit by throwing interrupted exception
+      if (Thread.interrupted()) {
+        throw new InterruptedException();
+      }
+      // wait to write output to queuue
+      outputQueue.put(new KeyValuePair<KEYOUT, VALUEOUT>(keyout, valueout));
+    }
+    /** 
+     * Close this <code>RecordWriter</code> to future operations.
+     * 
+     * @param context the context of the task
+     * @throws IOException
+     */ 
+    public void close(TaskAttemptContext context) 
+        throws IOException, InterruptedException {
+      if (outputQueue != null) {
+        // write end of input 
+        outputQueue.put(new KeyValuePair<KEYOUT, VALUEOUT>(true));
+      }
+    }
+  }
+  
+
+  private static class ChainValueIterator implements RawKeyValueIterator {
+    private RawKeyValueIterator rawIter;
+    private boolean firstTime;
+    private boolean hasFirst; 
+    // creator of the iterator has already done a call to () once. 
+    // pass the boolean to know whether it has the first key/value. 
+    public ChainValueIterator(RawKeyValueIterator iter, boolean hasFirst) 
+        throws InterruptedException, IOException {
+      rawIter = iter;
+      this.hasFirst = hasFirst;
+      this.firstTime = true;
+    }
+    
+    public void close() throws IOException {
+      rawIter.close();
+    }
+    
+    public DataInputBuffer getKey() throws IOException {
+      return rawIter.getKey();
+    }
+    
+    public Progress getProgress() {
+      return rawIter.getProgress();
+    }
+    
+    public DataInputBuffer getValue() throws IOException {
+      return rawIter.getValue();
+    }
+    
+    // first call on next is ignored and it will return 
+    // hasFirst specified in the constructor. This is done because, the 
+    // creator of the iterator has already done a call to next() once. 
+    public boolean next() throws IOException {
+      if (firstTime) {
+        firstTime = false;
+        return hasFirst; 
+      }
+      return rawIter.next();
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private static class ChainStatusReporter extends StatusReporter {
+    TaskInputOutputContext context;
+    ChainStatusReporter(TaskInputOutputContext context) {
+      this.context = context;
+    }
+
+    @Override
+    public Counter getCounter(Enum<?> name) {
+      return context.getCounter(name);
+    }
+
+    @Override
+    public Counter getCounter(String group, String name) {
+      return context.getCounter(group, name);
+    }
+
+    @Override
+    public void progress() {
+      context.progress();
+    }
+
+    @Override
+    public void setStatus(String status) {
+      context.setStatus(status);
+    }
+  }
+
+  Throwable getThrowable() {
+    return throwable;
+  }
+
+  private class MapRunner<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
+      extends Thread {
+    private Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapper;
+    private Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context chainContext;
+    private RecordReader<KEYIN, VALUEIN> rr;
+    private RecordWriter<KEYOUT, VALUEOUT> rw;
+
+    public MapRunner(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapper,
+        Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context mapperContext,
+        RecordReader<KEYIN, VALUEIN> rr, RecordWriter<KEYOUT, VALUEOUT> rw)
+        throws IOException, InterruptedException {
+      this.mapper = mapper;
+      this.rr = rr;
+      this.rw = rw;
+      this.chainContext = mapperContext;
+    }
+
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void run() {
+      try {
+        mapper.run(chainContext);
+        rr.close();
+        rw.close(chainContext);
+      } catch (Throwable e) {
+        if (throwable == null) {
+          throwable = e;
+         interruptAllThreads();
+        }
+      }
+    }
+  }
+
+  private class ReduceRunner<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
+      extends Thread {
+    private Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reducer;
+    private Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context chainContext;
+    private RecordWriter<KEYOUT, VALUEOUT> rw;
+
+    public ReduceRunner(
+        Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context,
+        Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reducer,
+        RecordWriter<KEYOUT, VALUEOUT> rw)
+        throws IOException, InterruptedException {
+      this.reducer = reducer;
+      this.chainContext = context;
+      this.rw = rw;
+    }
+
+    @Override
+    public void run() {
+      try {
+        reducer.run(chainContext);
+        rw.close(chainContext);
+      } catch (Throwable ie) {
+        if (throwable == null) {
+          throwable = ie;
+          interruptAllThreads();
+        }
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private static final Constructor<Mapper.Context> mapContextConstructor;
+  static {
+    try {
+      mapContextConstructor= Mapper.Context.class.getConstructor
+      (new Class[]{Mapper.class,
+                   Configuration.class,
+                   TaskAttemptID.class,
+                   RecordReader.class,
+                   RecordWriter.class,
+                   OutputCommitter.class,
+                   StatusReporter.class,
+                   InputSplit.class});
+    } catch (NoSuchMethodException nme) {
+      throw new IllegalArgumentException("Can't find constructor");
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  private static final Constructor<Reducer.Context> reduceContextConstructor;
+  static {
+    try {
+      reduceContextConstructor = Reducer.Context.class.getConstructor
+        (new Class[]{Reducer.class,
+            Configuration.class,
+            TaskAttemptID.class,
+            RawKeyValueIterator.class,
+            Counter.class,
+            RecordWriter.class,
+            OutputCommitter.class,
+            StatusReporter.class,
+            RawComparator.class,
+            Class.class,
+            Class.class});
+    } catch (NoSuchMethodException nme) {
+      throw new IllegalArgumentException("Can't find constructor");
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private <INKEY,INVALUE,OUTKEY,OUTVALUE> 
+    Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
+      createMapContext(RecordReader<INKEY,INVALUE> rr,
+        RecordWriter<OUTKEY,OUTVALUE> rw,
+        TaskInputOutputContext context, InputSplit split, int index) 
+      throws IOException {
+    try {
+      Configuration conf = getConf(index);
+      Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper = mappers.get(index);
+      return mapContextConstructor.newInstance(mapper, conf,
+        context.getTaskAttemptID(), rr, rw,
+        context.getOutputCommitter(), new ChainStatusReporter(context), split);
+    } catch (InstantiationException e) {
+      throw new IOException("Can't create Context", e);
+    } catch (InvocationTargetException e) {
+      throw new IOException("Can't invoke Context constructor", e);
+    } catch (IllegalAccessException e) {
+      throw new IOException("Can't invoke Context constructor", e);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private <INKEY,INVALUE,OUTKEY,OUTVALUE> 
+      Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
+      createReduceContext(TaskInputOutputContext context,
+        RecordWriter<OUTKEY,OUTVALUE> output) 
+      throws IOException, InterruptedException {
+    try {
+      Class<?> keyClass = 
+        rConf.getClass(REDUCER_INPUT_KEY_CLASS, Object.class);
+      Class<?> valueClass = 
+        rConf.getClass(REDUCER_INPUT_VALUE_CLASS, Object.class);
+
+      return reduceContextConstructor.newInstance(reducer, rConf,
+        context.getTaskAttemptID(), 
+        new ChainValueIterator(((ReduceContext)context).getIterator(),
+          ((ReduceContext)context).hasMore()),
+        ((ReduceContext)context).getInputCounter(), output,
+        context.getOutputCommitter(), new ChainStatusReporter(context),
+        ((ReduceContext)context).getComparator(), keyClass, valueClass);
+    } catch (InstantiationException e) {
+      throw new IOException("Can't create Context", e);
+    } catch (InvocationTargetException e) {
+      throw new IOException("Can't invoke Context constructor", e);
+    } catch (IllegalAccessException e) {
+      throw new IOException("Can't invoke Context constructor", e);
+    }
+  }
+
+  Configuration getConf(int index) {
+    return confList.get(index);
+  }
+
+  // Run the mapper directly.
+  // wrapping the context is required here to pass user-given
+  // mapper configuration in the context
+  @SuppressWarnings("unchecked")
+  void runMapper(TaskInputOutputContext context, int index) 
+      throws IOException, InterruptedException { 
+    Mapper mapper = mappers.get(index);
+    RecordReader rr = new ChainRecordReader(context);
+    RecordWriter rw = new ChainRecordWriter(context);
+    Mapper.Context mapperContext = createMapContext(rr, rw, context,
+      ((MapContext)context).getInputSplit(), index);
+    mapper.run(mapperContext);
+    rr.close();
+    rw.close(context);
+  }
+
+  // start mapper(the first mapper) in a thread 
+  // this mapper reads context from queue and writes output to queue
+  @SuppressWarnings("unchecked")
+  void startMapper(TaskInputOutputContext inputContext,
+      BlockingQueue<KeyValuePair<?, ?>> output, int index) 
+      throws IOException, InterruptedException { 
+    Configuration conf = getConf(index);
+    Class<?> keyOutClass = 
+      conf.getClass(MAPPER_OUTPUT_KEY_CLASS, Object.class);
+    Class<?> valueOutClass = 
+      conf.getClass(MAPPER_OUTPUT_VALUE_CLASS, Object.class);
+
+    RecordReader rr = new ChainRecordReader(inputContext);
+    RecordWriter rw = new ChainRecordWriter(keyOutClass,
+      valueOutClass, output, conf);
+    Mapper.Context mapperContext = createMapContext(rr, rw, inputContext,
+      ((MapContext)inputContext).getInputSplit(), index);
+    MapRunner runner = 
+      new MapRunner(mappers.get(index), mapperContext, rr, rw);
+    threads.add(runner);
+    runner.start();
+  }
+  
+  // start mapper(the last mapper) in a thread 
+  // this mapper reads input from queue and writes output to context
+  @SuppressWarnings("unchecked")
+  void startMapper(BlockingQueue<KeyValuePair<?, ?>> input,
+      TaskInputOutputContext outputContext, int index)
+      throws IOException, InterruptedException { 
+    Configuration conf = getConf(index);
+    Class<?> keyClass = conf.getClass(MAPPER_INPUT_KEY_CLASS, Object.class);
+    Class<?> valueClass = 
+      conf.getClass(MAPPER_INPUT_VALUE_CLASS, Object.class);
+    RecordReader rr = new ChainRecordReader(keyClass, valueClass, input, conf);
+    RecordWriter rw = new ChainRecordWriter(outputContext);
+    MapRunner runner = new MapRunner(mappers.get(index),
+      createMapContext(rr, rw, outputContext, null, index), rr, rw);
+    threads.add(runner);
+    runner.start();
+  }
+
+  // start mapper in a thread
+  // this mapper reads input from queue and writes output to queue
+  @SuppressWarnings("unchecked")
+  void startMapper(BlockingQueue<KeyValuePair<?, ?>> input, 
+      BlockingQueue<KeyValuePair<?, ?>> output,
+      TaskInputOutputContext context, int index)
+      throws IOException, InterruptedException { 
+    Configuration conf = getConf(index);
+    Class<?> keyClass = conf.getClass(MAPPER_INPUT_KEY_CLASS, Object.class);
+    Class<?> valueClass = 
+      conf.getClass(MAPPER_INPUT_VALUE_CLASS, Object.class);
+    Class<?> keyOutClass = 
+      conf.getClass(MAPPER_OUTPUT_KEY_CLASS, Object.class);
+    Class<?> valueOutClass = 
+      conf.getClass(MAPPER_OUTPUT_VALUE_CLASS, Object.class);
+    RecordReader rr = new ChainRecordReader(keyClass, valueClass, input, conf);
+    RecordWriter rw = new ChainRecordWriter(keyOutClass,
+      valueOutClass, output, conf);
+    MapRunner runner = new MapRunner(mappers.get(index),
+      createMapContext(rr, rw, context, null, index), rr, rw);
+    threads.add(runner);
+    runner.start();
+  }
+
+  // Run the reducer directly.
+  // wrapping the context is required here to pass user-given
+  // reducer configuration in the context
+  @SuppressWarnings("unchecked")
+  <KEYIN, VALUEIN, KEYOUT, VALUEOUT> void runReducer(
+      TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context) 
+      throws IOException, InterruptedException { 
+    RecordWriter<KEYOUT, VALUEOUT> rw = 
+      new ChainRecordWriter<KEYOUT, VALUEOUT>(context);
+    Reducer.Context reducerContext = 
+      createReduceContext(context, rw);
+    reducer.run(reducerContext);
+    rw.close(context);
+  }
+
+  // start reducer in a thread 
+  // reducer reads input from task context and writes output to output queue
+  @SuppressWarnings("unchecked")
+  void startReducer(TaskInputOutputContext inputContext,
+      BlockingQueue<KeyValuePair<?, ?>> outputQueue) 
+      throws IOException, InterruptedException { 
+
+    Class<?> keyOutClass = 
+      rConf.getClass(REDUCER_OUTPUT_KEY_CLASS, Object.class);
+    Class<?> valueOutClass = 
+      rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS, Object.class);
+    RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass,
+      outputQueue, rConf);
+    Reducer.Context reducerContext = createReduceContext(inputContext, rw);
+    
+    ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw);
+    threads.add(runner);
+    runner.start();
+  }
+
+  // join all threads. If one the threads throw throwable,
+  // throw it as RunTimeException to make the task fail
+  void joinAllThreads() 
+      throws IOException, InterruptedException {
+    for (Thread thread :threads) {
+      thread.join();
+    }
+    if (throwable != null) {
+      if (throwable instanceof IOException) {
+        throw (IOException) throwable;
+      } else if (throwable instanceof InterruptedException) {
+        throw (InterruptedException) throwable;
+      } else {
+        throw new RuntimeException(throwable);
+      }
+    }
+  }
+
+  // interrupt all threads
+  private void interruptAllThreads() {
+    for (Thread th : threads) {
+      th.interrupt();
+    }
+  }
+
+  /**
+   * Returns the prefix to use for the configuration of the chain depending
+   * if it is for a Mapper or a Reducer.
+   *
+   * @param isMap TRUE for Mapper, FALSE for Reducer.
+   * @return the prefix to use.
+   */
+  protected static String getPrefix(boolean isMap) {
+    return (isMap) ? CHAIN_MAPPER : CHAIN_REDUCER;
+  }
+
+  protected static int getIndex(Configuration conf, String prefix) {
+    return conf.getInt(prefix + CHAIN_MAPPER_SIZE, 0); 
+  }
+  
+  /**
+   * Creates a {@link Configuration} for the Map or Reduce in the chain.
+   * 
+   * <p>
+   * It creates a new Configuration using the chain job's Configuration as
+   * base and adds to it the configuration properties for the chain element.
+   * The keys of the chain element Configuration have precedence over the
+   * given Configuration.
+   * </p>
+   *
+   * @param jobConf the chain job's Configuration.
+   * @param confKey the key for chain element configuration serialized in the
+   *                chain job's Configuration.
+   * @return a new Configuration aggregating the chain job's Configuration 
+   *         with the chain element configuration properties.
+   */
+  protected static Configuration getChainElementConf(Configuration jobConf,
+      String confKey) {
+    Configuration conf = null;
+    try {
+      Stringifier<Configuration> stringifier =
+        new DefaultStringifier<Configuration>(jobConf, Configuration.class);
+      String confString = jobConf.get(confKey, null);
+      if (confString != null) {
+        conf = stringifier.fromString(jobConf.get(confKey, null));
+      }
+    } catch (IOException ioex) {
+      throw new RuntimeException(ioex);
+    }
+    // we have to do this because the Writable desearialization clears all
+    // values set in the conf making not possible do a 
+    // new Configuration(jobConf) in the creation of the conf above
+    jobConf = new Configuration(jobConf);
+
+    if (conf != null) {
+      for(Map.Entry<String, String> entry : conf) {
+        jobConf.set(entry.getKey(), entry.getValue());
+      }
+    }
+    return jobConf;
+  }
+
+  /**
+   * Adds a Mapper class to the chain job.
+   * 
+   * <p/>
+   * The configuration properties of the chain job have precedence over the
+   * configuration properties of the Mapper.
+   *
+   * @param isMap            indicates if the Chain is for a Mapper or for a
+   * Reducer.
+   * @param job              chain job.
+   * @param klass            the Mapper class to add.
+   * @param inputKeyClass    mapper input key class.
+   * @param inputValueClass  mapper input value class.
+   * @param outputKeyClass   mapper output key class.
+   * @param outputValueClass mapper output value class.
+   * @param mapperConf       a configuration for the Mapper class.
+   * It is recommended to use a Configuration without default values using the
+   * <code>Configuration(boolean loadDefaults)</code> constructor with FALSE.
+   */
+  protected static void addMapper(boolean isMap, Job job,
+      Class<? extends Mapper> klass, Class<?> inputKeyClass,
+      Class<?> inputValueClass, Class<?> outputKeyClass,
+      Class<?> outputValueClass, Configuration mapperConf) {
+    String prefix = getPrefix(isMap);
+    Configuration jobConf = job.getConfiguration();
+
+    // if a reducer chain check the Reducer has been already set
+    checkReducerAlreadySet(isMap, jobConf, prefix, true);
+    
+    // set the mapper class
+    int index = getIndex(jobConf, prefix);
+    jobConf.setClass(prefix + CHAIN_MAPPER_CLASS + index, klass, Mapper.class);
+    
+    validateKeyValueTypes(isMap, jobConf, inputKeyClass, inputValueClass,
+      outputKeyClass, outputValueClass, index, prefix);
+    
+    setMapperConf(isMap, jobConf, inputKeyClass, inputValueClass,
+      outputKeyClass, outputValueClass, mapperConf, index, prefix);
+  }
+
+  // if a reducer chain check the Reducer has been already set or not
+  protected static void checkReducerAlreadySet(boolean isMap,
+      Configuration jobConf, String prefix, boolean shouldSet) {
+    if (!isMap) {
+      if (shouldSet) {
+        if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null) == null) {
+          throw new IllegalStateException(
+            "A Mapper can be added to the chain only after the Reducer has " +
+            "been set");
+        }
+      } else {
+        if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null) != null) {
+          throw new IllegalStateException("Reducer has been already set");
+        }
+      }
+    }
+  }
+  
+  protected static void validateKeyValueTypes(boolean isMap,
+      Configuration jobConf, Class<?> inputKeyClass, Class<?> inputValueClass,
+	  Class<?> outputKeyClass, Class<?> outputValueClass, int index,
+	  String prefix) {
+    // if it is a reducer chain and the first Mapper is being added check the
+    // key and value input classes of the mapper match those of the reducer
+    // output.
+    if (!isMap && index == 0) {
+      Configuration reducerConf =
+        getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG);
+      if (! inputKeyClass.isAssignableFrom(
+        reducerConf.getClass(REDUCER_OUTPUT_KEY_CLASS, null))) {
+        throw new IllegalArgumentException("The Reducer output key class does" +
+          " not match the Mapper input key class");
+      }
+      if (! inputValueClass.isAssignableFrom(
+        reducerConf.getClass(REDUCER_OUTPUT_VALUE_CLASS, null))) {
+        throw new IllegalArgumentException("The Reducer output value class" +
+          " does not match the Mapper input value class");
+      }
+    } else if (index > 0) {
+      // check the that the new Mapper in the chain key and value input classes
+      // match those of the previous Mapper output.
+      Configuration previousMapperConf =
+        getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG +
+          (index - 1));
+      if (! inputKeyClass.isAssignableFrom(
+        previousMapperConf.getClass(MAPPER_OUTPUT_KEY_CLASS, null))) {
+        throw new IllegalArgumentException("The Mapper output key class does" +
+          " not match the previous Mapper input key class");
+      }
+      if (! inputValueClass.isAssignableFrom(
+        previousMapperConf.getClass(MAPPER_OUTPUT_VALUE_CLASS, null))) {
+        throw new IllegalArgumentException("The Mapper output value class" +
+          " does not match the previous Mapper input value class");
+      }
+    }
+  }
+
+  protected static void setMapperConf(boolean isMap, Configuration jobConf,
+      Class<?> inputKeyClass, Class<?> inputValueClass,
+      Class<?> outputKeyClass, Class<?> outputValueClass,
+      Configuration mapperConf, int index, String prefix) {
+    // if the Mapper does not have a configuration, create an empty one
+    if (mapperConf == null) {
+      // using a Configuration without defaults to make it lightweight.
+      // still the chain's conf may have all defaults and this conf is
+      // overlapped to the chain configuration one.
+      mapperConf = new Configuration(true);
+    }
+
+    // store the input/output classes of the mapper in the mapper conf
+    mapperConf.setClass(MAPPER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
+    mapperConf.setClass(MAPPER_INPUT_VALUE_CLASS, inputValueClass,
+                        Object.class);
+    mapperConf.setClass(MAPPER_OUTPUT_KEY_CLASS, outputKeyClass, Object.class);
+    mapperConf.setClass(MAPPER_OUTPUT_VALUE_CLASS, outputValueClass,
+                        Object.class);
+    // serialize the mapper configuration in the chain configuration.
+    Stringifier<Configuration> stringifier =
+      new DefaultStringifier<Configuration>(jobConf, Configuration.class);
+    try {
+      jobConf.set(prefix + CHAIN_MAPPER_CONFIG + index,
+                  stringifier.toString(new Configuration(mapperConf)));
+    }
+    catch (IOException ioEx) {
+      throw new RuntimeException(ioEx);
+    }
+
+    // increment the chain counter
+    jobConf.setInt(prefix + CHAIN_MAPPER_SIZE, index + 1);
+  }
+  
+  /**
+   * Sets the Reducer class to the chain job.
+   * 
+   * <p/>
+   * The configuration properties of the chain job have precedence over the
+   * configuration properties of the Reducer.
+   *
+   * @param job              the chain job.
+   * @param klass            the Reducer class to add.
+   * @param inputKeyClass    reducer input key class.
+   * @param inputValueClass  reducer input value class.
+   * @param outputKeyClass   reducer output key class.
+   * @param outputValueClass reducer output value class.
+   * @param reducerConf      a configuration for the Reducer class.
+   * It is recommended to use a Configuration without default values using the
+   * <code>Configuration(boolean loadDefaults)</code> constructor with FALSE.
+   */
+  protected static void setReducer(Job job, Class<? extends Reducer> klass,
+      Class<?> inputKeyClass, Class<?> inputValueClass,
+      Class<?> outputKeyClass, Class<?> outputValueClass,
+      Configuration reducerConf) {
+    String prefix = getPrefix(false);
+    Configuration jobConf = job.getConfiguration();
+    checkReducerAlreadySet(false, jobConf, prefix, false);
+
+    jobConf.setClass(prefix + CHAIN_REDUCER_CLASS, klass, Reducer.class);
+    
+    setReducerConf(jobConf, inputKeyClass, inputValueClass, outputKeyClass,
+      outputValueClass, reducerConf, prefix);
+  }
+
+  protected static void setReducerConf(Configuration jobConf, 
+      Class<?> inputKeyClass, Class<?> inputValueClass, 
+      Class<?> outputKeyClass, Class<?> outputValueClass,
+      Configuration reducerConf, String prefix) {
+    // if the Reducer does not have a Configuration, create an empty one
+    if (reducerConf == null) {
+      // using a Configuration without defaults to make it lightweight.
+      // still the chain's conf may have all defaults and this conf is
+      // overlapped to the chain's Configuration one.
+      reducerConf = new Configuration(false);
+    }
+
+    // store the input/output classes of the reducer in 
+    // the reducer configuration 
+    reducerConf.setClass(REDUCER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
+    reducerConf.setClass(REDUCER_INPUT_VALUE_CLASS, inputValueClass,
+                         Object.class);
+    reducerConf.setClass(REDUCER_OUTPUT_KEY_CLASS, outputKeyClass,
+                         Object.class);
+    reducerConf.setClass(REDUCER_OUTPUT_VALUE_CLASS, outputValueClass,
+                         Object.class);
+
+    // serialize the reducer configuration in the chain's configuration.
+    Stringifier<Configuration> stringifier =
+      new DefaultStringifier<Configuration>(jobConf, Configuration.class);
+    try {
+      jobConf.set(prefix + CHAIN_REDUCER_CONFIG,
+        stringifier.toString(new Configuration(reducerConf)));
+    }
+    catch (IOException ioEx) {
+      throw new RuntimeException(ioEx);
+    }
+  }
+  
+  /**
+   * Setup the chain.
+   *
+   * @param jobConf chain job's {@link Configuration}.
+   */
+  @SuppressWarnings("unchecked")
+  void setup(Configuration jobConf) {
+    String prefix = getPrefix(isMap);
+    
+    int index = jobConf.getInt(prefix + CHAIN_MAPPER_SIZE, 0);
+    for (int i = 0; i < index; i++) {
+      Class<? extends Mapper> klass =
+        jobConf.getClass(prefix + CHAIN_MAPPER_CLASS + i, null, Mapper.class);
+      Configuration mConf =
+        getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG + i);
+      confList.add(mConf);
+      Mapper mapper = ReflectionUtils.newInstance(klass, mConf);
+      mappers.add(mapper);
+
+    }
+    
+    Class<? extends Reducer> klass =
+      jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null, Reducer.class);
+    if (klass != null) {
+      rConf = getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG);
+      reducer = ReflectionUtils.newInstance(klass, rConf);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  List<Mapper> getAllMappers() {
+    return mappers;
+  }
+  
+  /**
+   * Returns the Reducer instance in the chain.
+   *
+   * @return the Reducer instance in the chain or NULL if none.
+   */
+  Reducer<?, ?, ?, ?> getReducer() {
+    return reducer;
+  }
+}
\ No newline at end of file

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapper.java?rev=803019&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapper.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapper.java Tue Aug 11 07:56:49 2009
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.chain;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * The ChainMapper class allows to use multiple Mapper classes within a single
+ * Map task.
+ * 
+ * <p>
+ * The Mapper classes are invoked in a chained (or piped) fashion, the output 
+ * of the first becomes the input of the second, and so on until the last 
+ * Mapper, the output of the last Mapper will be written to the task's output.
+ * </p>
+ * <p>
+ * The key functionality of this feature is that the Mappers in the chain do 
+ * not need to be aware that they are executed in a chain. This enables having
+ * reusable specialized Mappers that can be combined to perform composite
+ * operations within a single task.
+ * </p>
+ * <p>
+ * Special care has to be taken when creating chains that the key/values output
+ * by a Mapper are valid for the following Mapper in the chain. It is assumed
+ * all Mappers and the Reduce in the chain use matching output and input key 
+ * and value classes as no conversion is done by the chaining code.
+ * </p>
+ * <p>
+ * Using the ChainMapper and the ChainReducer classes is possible to compose
+ * Map/Reduce jobs that look like <code>[MAP+ / REDUCE MAP*]</code>. And
+ * immediate benefit of this pattern is a dramatic reduction in disk IO.
+ * </p>
+ * <p>
+ * IMPORTANT: There is no need to specify the output key/value classes for the
+ * ChainMapper, this is done by the addMapper for the last mapper in the chain.
+ * </p>
+ * ChainMapper usage pattern:
+ * <p/>
+ * <pre>
+ * ...
+ * Job = new Job(conf);
+ * <p/>
+ * Configuration mapAConf = new Configuration(false);
+ * ...
+ * ChainMapper.addMapper(job, AMap.class, LongWritable.class, Text.class,
+ *   Text.class, Text.class, true, mapAConf);
+ * <p/>
+ * Configuration mapBConf = new Configuration(false);
+ * ...
+ * ChainMapper.addMapper(job, BMap.class, Text.class, Text.class,
+ *   LongWritable.class, Text.class, false, mapBConf);
+ * <p/>
+ * ...
+ * <p/>
+ * job.waitForComplettion(true);
+ * ...
+ * </pre>
+ */
+public class ChainMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
+    extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+
+  /**
+   * Adds a {@link Mapper} class to the chain mapper.
+   * 
+   * <p>
+   * The key and values are passed from one element of the chain to the next,
+   * by value.
+   * For the added Mapper the configuration given for it,
+   * <code>mapperConf</code>, have precedence over the job's Configuration. 
+   * This precedence is in effect when the task is running.
+   * </p>
+   * <p>
+   * IMPORTANT: There is no need to specify the output key/value classes for
+   * the ChainMapper, this is done by the addMapper for the last mapper in the
+   * chain
+   * </p>
+   *
+   * @param job              The job.
+   * @param klass            the Mapper class to add.
+   * @param inputKeyClass    mapper input key class.
+   * @param inputValueClass  mapper input value class.
+   * @param outputKeyClass   mapper output key class.
+   * @param outputValueClass mapper output value class.
+   * @param mapperConf       a configuration for the Mapper class. 
+   * It is recommended to use a Configuration without default
+   * values using the <code>Configuration(boolean loadDefaults)</code> 
+   * constructor with FALSE.
+   */
+  public static void addMapper(Job job, Class<? extends Mapper> klass,
+      Class<?> inputKeyClass, Class<?> inputValueClass,
+      Class<?> outputKeyClass, Class<?> outputValueClass,
+      Configuration mapperConf) throws IOException {
+    job.setMapperClass(ChainMapper.class);
+    job.setMapOutputKeyClass(outputKeyClass);
+    job.setMapOutputValueClass(outputValueClass);
+    Chain.addMapper(true, job, klass, inputKeyClass, inputValueClass,
+      outputKeyClass, outputValueClass, mapperConf);
+  }
+
+  private Chain chain;
+
+  protected void setup(Context context) {
+    chain = new Chain(true);
+    chain.setup(context.getConfiguration());
+  }
+  
+  public void run(Context context) 
+      throws IOException, InterruptedException {
+    
+    setup(context);
+
+    int numMappers = chain.getAllMappers().size();
+    if (numMappers == 0) {
+      return;
+    }
+    
+    BlockingQueue<Chain.KeyValuePair<?, ?>> inputqueue;
+    BlockingQueue<Chain.KeyValuePair<?, ?>> outputqueue;
+    if (numMappers == 1) {
+      chain.runMapper(context, 0);
+    } else {
+      // start all the mappers on different threads with proper context
+      // start first mapper
+      outputqueue = new LinkedBlockingQueue<Chain.KeyValuePair<?, ?>>(1);
+      chain.startMapper(context, outputqueue, 0);
+      // start other mappers
+      for (int i = 1; i < numMappers - 1; i++) {
+        inputqueue = outputqueue;
+        outputqueue = new LinkedBlockingQueue<Chain.KeyValuePair<?, ?>>(1);
+        chain.startMapper(inputqueue, outputqueue, context, i);
+      }
+      // start last mapper
+      chain.startMapper(outputqueue, context, numMappers - 1);
+    }
+    // wait for all threads
+    chain.joinAllThreads();
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReducer.java?rev=803019&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReducer.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReducer.java Tue Aug 11 07:56:49 2009
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.chain;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * The ChainReducer class allows to chain multiple Mapper classes after a
+ * Reducer within the Reducer task.
+ * 
+ * <p>
+ * For each record output by the Reducer, the Mapper classes are invoked in a
+ * chained (or piped) fashion. The output of the reducer becomes the input of
+ * the first mapper and output of first becomes the input of the
+ * second, and so on until the last Mapper, the output of the last Mapper will
+ * be written to the task's output.
+ * </p> 
+ * <p>
+ * The key functionality of this feature is that the Mappers in the chain do 
+ * not need to be aware that they are executed after the Reducer or in a chain.
+ * This enables having reusable specialized Mappers that can be combined to
+ * perform composite operations within a single task.
+ * </p>
+ * <p>
+ * Special care has to be taken when creating chains that the key/values output
+ * by a Mapper are valid for the following Mapper in the chain. It is assumed
+ * all Mappers and the Reduce in the chain use matching output and input key
+ * and value classes as no conversion is done by the chaining code.
+ * </p>
+ * </p>
+ * Using the ChainMapper and the ChainReducer classes is possible to compose
+ * Map/Reduce jobs that look like <code>[MAP+ / REDUCE MAP*]</code>. And
+ * immediate benefit of this pattern is a dramatic reduction in disk IO.
+ * </p>
+ * <p>
+ * IMPORTANT: There is no need to specify the output key/value classes for the
+ * ChainReducer, this is done by the setReducer or the addMapper for the last
+ * element in the chain.
+ * </p>
+ * ChainReducer usage pattern:
+ * <p/>
+ * <pre>
+ * ...
+ * Job = new Job(conf);
+ * ....
+ * <p/>
+ * Configuration reduceConf = new Configuration(false);
+ * ...
+ * ChainReducer.setReducer(job, XReduce.class, LongWritable.class, Text.class,
+ *   Text.class, Text.class, true, reduceConf);
+ * <p/>
+ * ChainReducer.addMapper(job, CMap.class, Text.class, Text.class,
+ *   LongWritable.class, Text.class, false, null);
+ * <p/>
+ * ChainReducer.addMapper(job, DMap.class, LongWritable.class, Text.class,
+ *   LongWritable.class, LongWritable.class, true, null);
+ * <p/>
+ * ...
+ * <p/>
+ * job.waitForCompletion(true);
+ * ...
+ * </pre>
+ */
+public class ChainReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
+    extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+
+  /**
+   * Sets the {@link Reducer} class to the chain job.
+   * 
+   * <p>
+   * The key and values are passed from one element of the chain to the next,
+   * by value. 
+   * For the added Reducer the configuration given for it,
+   * <code>reducerConf</code>, have precedence over the job's Configuration.
+   * This precedence is in effect when the task is running.
+   * </p>
+   * <p>
+   * IMPORTANT: There is no need to specify the output key/value classes for 
+   * the ChainReducer, this is done by the setReducer or the addMapper for the
+   * last element in the chain.
+   * </p>
+   *
+   * @param job              the job 
+   * @param klass            the Reducer class to add.
+   * @param inputKeyClass    reducer input key class.
+   * @param inputValueClass  reducer input value class.
+   * @param outputKeyClass   reducer output key class.
+   * @param outputValueClass reducer output value class.
+   * @param reducerConf      a configuration for the Reducer class.
+   * It is recommended to use a Configuration without default
+   * values using the <code>Configuration(boolean loadDefaults)</code> 
+   * constructor with FALSE.
+   */
+  public static void setReducer(Job job, Class<? extends Reducer> klass,
+      Class<?> inputKeyClass, Class<?> inputValueClass,
+      Class<?> outputKeyClass, Class<?> outputValueClass,
+      Configuration reducerConf) {
+    job.setReducerClass(ChainReducer.class);
+    job.setOutputKeyClass(outputKeyClass);
+    job.setOutputValueClass(outputValueClass);
+    Chain.setReducer(job, klass, inputKeyClass, inputValueClass,
+      outputKeyClass, outputValueClass, reducerConf);
+  }
+
+  /**
+   * Adds a {@link Mapper} class to the chain reducer.
+   * 
+   * <p>
+   * The key and values are passed from one element of the chain to the next,
+   * by value
+   * For the added Mapper the configuration given for it,
+   * <code>mapperConf</code>, have precedence over the job's Configuration. 
+   * This precedence is in effect when the task is running.
+   * </p>
+   * <p>
+   * IMPORTANT: There is no need to specify the output key/value classes for
+   * the ChainMapper, this is done by the addMapper for the last mapper in the
+   * chain.
+   * </p>
+   *
+   * @param job              The job.
+   * @param klass            the Mapper class to add.
+   * @param inputKeyClass    mapper input key class.
+   * @param inputValueClass  mapper input value class.
+   * @param outputKeyClass   mapper output key class.
+   * @param outputValueClass mapper output value class.
+   * @param mapperConf       a configuration for the Mapper class. 
+   * It is recommended to use a Configuration without default
+   * values using the <code>Configuration(boolean loadDefaults)</code> 
+   * constructor with FALSE.
+   */
+  public static void addMapper(Job job, Class<? extends Mapper> klass,
+      Class<?> inputKeyClass, Class<?> inputValueClass,
+      Class<?> outputKeyClass, Class<?> outputValueClass,
+      Configuration mapperConf) throws IOException {
+    job.setOutputKeyClass(outputKeyClass);
+    job.setOutputValueClass(outputValueClass);
+    Chain.addMapper(false, job, klass, inputKeyClass, inputValueClass,
+                    outputKeyClass, outputValueClass, mapperConf);
+  }
+
+  private Chain chain;
+
+  protected void setup(Context context) {
+    chain = new Chain(false);
+    chain.setup(context.getConfiguration());
+  }
+
+  public void run(Context context) 
+      throws IOException, InterruptedException {
+    setup(context);
+
+    // if no reducer is set, just do nothing
+    if (chain.getReducer() == null) {
+      return;
+    }
+    int numMappers = chain.getAllMappers().size();
+    // if there are no mappers in chain, run the reducer
+    if (numMappers == 0) {
+      chain.runReducer(context);
+      return;
+    }
+
+    // start reducer and all mappers in threads with proper context
+    BlockingQueue<Chain.KeyValuePair<?, ?>> inputqueue;
+    BlockingQueue<Chain.KeyValuePair<?, ?>> outputqueue;
+    // start reducer
+    outputqueue = new LinkedBlockingQueue<Chain.KeyValuePair<?, ?>>(1);
+    chain.startReducer(context, outputqueue);
+    // start all mappers except last one
+    for (int i = 0; i < numMappers - 1; i++) {
+      inputqueue = outputqueue;
+      outputqueue = new LinkedBlockingQueue<Chain.KeyValuePair<?, ?>>(1);
+      chain.startMapper(inputqueue, outputqueue, context, i);
+    }
+    // start last mapper
+    chain.startMapper(outputqueue, context, numMappers - 1);
+
+    // wait for all threads
+    chain.joinAllThreads();
+  }
+}

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestChainErrors.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestChainErrors.java?rev=803019&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestChainErrors.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestChainErrors.java Tue Aug 11 07:56:49 2009
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.chain;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.HadoopTestCase;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+
+import java.io.IOException;
+
+public class TestChainErrors extends HadoopTestCase {
+
+  private static String localPathRoot = 
+    System.getProperty("test.build.data", "/tmp");
+
+  public TestChainErrors() throws IOException {
+    super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1);
+  }
+  private Path inDir = new Path(localPathRoot, "testing/chain/input");
+  private Path outDir = new Path(localPathRoot, "testing/chain/output");
+  private String input = "a\nb\nc\nd\n";
+
+  public void testChainSubmission() throws Exception {
+
+    Configuration conf = createJobConf();
+
+    Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 1, input);
+    job.setJobName("chain");
+
+    Throwable th = null;
+    try {
+      ChainMapper.addMapper(job, CMap.class, LongWritable.class, Text.class,
+        IntWritable.class, Text.class, null);
+      ChainMapper.addMapper(job, BMap.class, LongWritable.class, Text.class,
+        LongWritable.class, Text.class, null);
+    } catch (IllegalArgumentException iae) {
+      th = iae;
+    }
+    assertTrue(th != null);
+
+    th = null;
+    try {
+      ChainReducer.setReducer(job, CReduce.class, LongWritable.class,
+        Text.class, IntWritable.class, Text.class, null);
+      ChainMapper.addMapper(job, AMap.class, LongWritable.class, Text.class,
+        LongWritable.class, Text.class, null);
+    } catch (IllegalArgumentException iae) {
+      th = iae;
+    }
+    assertTrue(th != null);
+  }
+  
+  public void testChainFail() throws Exception {
+
+    Configuration conf = createJobConf();
+
+    Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 0, input);
+    job.setJobName("chain");
+
+    ChainMapper.addMapper(job, BMap.class, LongWritable.class, Text.class,
+      LongWritable.class, Text.class, null);
+   
+    ChainMapper.addMapper(job, CMap.class, LongWritable.class, Text.class,
+      IntWritable.class, Text.class, null);
+    
+    job.waitForCompletion(true);
+    assertTrue("Job Not failed", !job.isSuccessful());
+  }
+
+  public void testChainNoOuptut() throws Exception {
+    Configuration conf = createJobConf();
+    String expectedOutput = "";
+
+    Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 0, input);
+    job.setJobName("chain");
+
+    ChainMapper.addMapper(job, AMap.class, IntWritable.class, Text.class,
+      LongWritable.class, Text.class, null);
+
+    ChainMapper.addMapper(job, BMap.class, LongWritable.class, Text.class,
+      LongWritable.class, Text.class, null);
+   
+    job.waitForCompletion(true);
+    assertTrue("Job failed", job.isSuccessful());
+    assertEquals("Outputs doesn't match", expectedOutput,
+      MapReduceTestUtil.readOutput(outDir, conf));
+  }
+
+ 
+  // this map consumes all the input and output nothing
+  public static class AMap 
+      extends Mapper<LongWritable, Text, LongWritable, Text> {
+    public void map(LongWritable key, Text value, 
+        Context context) throws IOException, InterruptedException {
+    }
+  }
+
+  public static class BMap 
+      extends Mapper<LongWritable, Text, LongWritable, Text> {
+  }
+
+  // this map throws Exception for input value "b" 
+  public static class CMap 
+      extends Mapper<LongWritable, Text, IntWritable, Text> {
+    protected void map(LongWritable key, Text value, 
+        Context context) throws IOException, InterruptedException {
+      if (value.toString().equals("b")) {
+        throw new IOException();
+      }
+    }
+  }
+  
+  public static class CReduce 
+      extends Reducer<LongWritable, Text, IntWritable, Text> {
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestChainWordCount.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestChainWordCount.java?rev=803019&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestChainWordCount.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestChainWordCount.java Tue Aug 11 07:56:49 2009
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.chain;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.HadoopTestCase;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.lib.map.TokenCounterMapper;
+import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
+
+import java.io.IOException;
+
+public class TestChainWordCount extends HadoopTestCase {
+
+  private static String localPathRoot = 
+    System.getProperty("test.build.data", "/tmp");
+
+  public TestChainWordCount() throws IOException {
+    super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1);
+  }
+
+  // test chain mapper and reducer by adding single mapper and reducer to chain
+  public void testNoChain() throws Exception {
+    Path inDir = new Path(localPathRoot, "testing/chain/input");
+    Path outDir = new Path(localPathRoot, "testing/chain/output");
+    String input = "a\nb\na\n";
+    String expectedOutput = "a\t2\nb\t1\n";
+
+    Configuration conf = createJobConf();
+
+    Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 1, input);
+    job.setJobName("chain");
+
+    ChainMapper.addMapper(job, TokenCounterMapper.class, Object.class,
+      Text.class, Text.class, IntWritable.class, null);
+
+    ChainReducer.setReducer(job, IntSumReducer.class, Text.class,
+      IntWritable.class, Text.class, IntWritable.class, null);
+
+    job.waitForCompletion(true);
+    assertTrue("Job failed", job.isSuccessful());
+    assertEquals("Outputs doesn't match", expectedOutput,
+      MapReduceTestUtil.readOutput(outDir, conf));
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestMapReduceChain.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestMapReduceChain.java?rev=803019&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestMapReduceChain.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestMapReduceChain.java Tue Aug 11 07:56:49 2009
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.chain;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.HadoopTestCase;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class TestMapReduceChain extends HadoopTestCase {
+
+  private static String localPathRoot = 
+    System.getProperty("test.build.data", "/tmp");
+  private static Path flagDir = new Path(localPathRoot, "testing/chain/flags");
+
+  private static void cleanFlags(Configuration conf) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    fs.delete(flagDir, true);
+    fs.mkdirs(flagDir);
+  }
+
+  private static void writeFlag(Configuration conf, String flag) 
+      throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    if (getFlag(conf, flag)) {
+      fail("Flag " + flag + " already exists");
+    }
+    DataOutputStream file = fs.create(new Path(flagDir, flag));
+    file.close();
+  }
+
+  private static boolean getFlag(Configuration conf, String flag) 
+      throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    return fs.exists(new Path(flagDir, flag));
+  }
+
+  public TestMapReduceChain() throws IOException {
+    super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1);
+  }
+
+  public void testChain() throws Exception {
+    Path inDir = new Path(localPathRoot, "testing/chain/input");
+    Path outDir = new Path(localPathRoot, "testing/chain/output");
+    String input = "1\n2\n";
+
+    Configuration conf = createJobConf();
+    cleanFlags(conf);
+    conf.set("a", "X");
+
+    Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 1, input);
+    job.setJobName("chain");
+
+    Configuration mapAConf = new Configuration(false);
+    mapAConf.set("a", "A");
+    ChainMapper.addMapper(job, AMap.class, LongWritable.class, Text.class,
+      LongWritable.class, Text.class, mapAConf);
+
+    ChainMapper.addMapper(job, BMap.class, LongWritable.class, Text.class,
+      LongWritable.class, Text.class, null);
+   
+    ChainMapper.addMapper(job, CMap.class, LongWritable.class, Text.class,
+      LongWritable.class, Text.class, null);
+    
+    Configuration reduceConf = new Configuration(false);
+    reduceConf.set("a", "C");
+    ChainReducer.setReducer(job, CReduce.class, LongWritable.class,
+      Text.class, LongWritable.class, Text.class, reduceConf);
+
+    ChainReducer.addMapper(job, DMap.class, LongWritable.class, Text.class,
+      LongWritable.class, Text.class,  null);
+
+    Configuration mapEConf = new Configuration(false);
+    mapEConf.set("a", "E");
+    ChainReducer.addMapper(job, EMap.class, LongWritable.class, Text.class,
+      LongWritable.class, Text.class, mapEConf);
+
+    ChainReducer.addMapper(job, FMap.class, LongWritable.class, Text.class,
+      LongWritable.class, Text.class,  null);
+
+    job.waitForCompletion(true);
+    assertTrue("Job failed", job.isSuccessful());
+
+    String str = "flag not set";
+    assertTrue(str, getFlag(conf, "map.setup.A"));
+    assertTrue(str, getFlag(conf, "map.setup.B"));
+    assertTrue(str, getFlag(conf, "map.setup.C"));
+    assertTrue(str, getFlag(conf, "reduce.setup.C"));
+    assertTrue(str, getFlag(conf, "map.setup.D"));
+    assertTrue(str, getFlag(conf, "map.setup.E"));
+    assertTrue(str, getFlag(conf, "map.setup.F"));
+
+    assertTrue(str, getFlag(conf, "map.A.value.1"));
+    assertTrue(str, getFlag(conf, "map.A.value.2"));
+    assertTrue(str, getFlag(conf, "map.B.value.1"));
+    assertTrue(str, getFlag(conf, "map.B.value.2"));
+    assertTrue(str, getFlag(conf, "map.C.value.1"));
+    assertTrue(str, getFlag(conf, "map.C.value.2"));
+    assertTrue(str, getFlag(conf, "reduce.C.value.1"));
+    assertTrue(str, getFlag(conf, "reduce.C.value.2"));
+    assertTrue(str, getFlag(conf, "map.D.value.1"));
+    assertTrue(str, getFlag(conf, "map.D.value.2"));
+    assertTrue(str, getFlag(conf, "map.E.value.1"));
+    assertTrue(str, getFlag(conf, "map.E.value.2"));
+    assertTrue(str, getFlag(conf, "map.F.value.1"));
+    assertTrue(str, getFlag(conf, "map.F.value.2"));
+
+    assertTrue(getFlag(conf, "map.cleanup.A"));
+    assertTrue(getFlag(conf, "map.cleanup.B"));
+    assertTrue(getFlag(conf, "map.cleanup.C"));
+    assertTrue(getFlag(conf, "reduce.cleanup.C"));
+    assertTrue(getFlag(conf, "map.cleanup.D"));
+    assertTrue(getFlag(conf, "map.cleanup.E"));
+    assertTrue(getFlag(conf, "map.cleanup.F"));
+  }
+
+  public static class AMap extends IDMap {
+    public AMap() {
+      super("A", "A");
+    }
+  }
+
+  public static class BMap extends IDMap {
+    public BMap() {
+      super("B", "X");
+    }
+  }
+
+  public static class CMap extends IDMap {
+    public CMap() {
+      super("C", "X");
+    }
+  }
+  
+  public static class CReduce extends IDReduce {
+    public CReduce() {
+      super("C", "C");
+    }
+  }
+
+  public static class DMap extends IDMap {
+    public DMap() {
+      super("D", "X");
+    }
+  }
+
+  public static class EMap extends IDMap {
+    public EMap() {
+      super("E", "E");
+    }
+  }
+
+  public static class FMap extends IDMap {
+    public FMap() {
+      super("F", "X");
+    }
+  }
+
+  
+  public static class IDMap
+      extends Mapper<LongWritable, Text, LongWritable, Text> {
+    private String name;
+    private String prop;
+
+    public IDMap(String name, String prop) {
+      this.name = name;
+      this.prop = prop;
+    }
+
+    public void setup(Context context) 
+        throws IOException, InterruptedException {
+      Configuration conf = context.getConfiguration();
+      assertEquals(prop, conf.get("a"));
+      writeFlag(conf, "map.setup." + name);
+    }
+
+    public void map(LongWritable key, Text value, Context context)
+        throws IOException, InterruptedException {
+      writeFlag(context.getConfiguration(), "map." + name + ".value." + value);
+      key.set(10);
+      context.write(key, value);
+    }
+
+    public void cleanup(Context context) 
+        throws IOException, InterruptedException {
+      writeFlag(context.getConfiguration(), "map.cleanup." + name);
+    }
+  }
+
+  public static class IDReduce
+      extends Reducer<LongWritable, Text, LongWritable, Text> {
+
+    private String name;
+    private String prop;
+
+    public IDReduce(String name, String prop) {
+      this.name = name;
+      this.prop = prop;
+    }
+
+    public void setup(Context context)
+        throws IOException, InterruptedException {
+      Configuration conf = context.getConfiguration();
+      assertEquals(prop, conf.get("a"));
+      writeFlag(conf, "reduce.setup." + name);
+    }
+
+    public void reduce(LongWritable key, Iterable<Text> values,
+        Context context) throws IOException, InterruptedException  {
+      for (Text value : values) {
+        writeFlag(context.getConfiguration(),
+          "reduce." + name + ".value." + value);
+        key.set(10);
+        context.write(key, value);
+      }
+    }
+
+    public void cleanup(Context context) 
+        throws IOException, InterruptedException {
+      writeFlag(context.getConfiguration(), "reduce.cleanup." + name);
+    }
+  }
+}



Mime
View raw message