hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r896750 [1/2] - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/lib/ src/java/org/apache/hadoop/mapreduce/lib/chain/ src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/
Date Thu, 07 Jan 2010 04:08:16 GMT
Author: sharad
Date: Thu Jan  7 04:06:55 2010
New Revision: 896750

URL: http://svn.apache.org/viewvc?rev=896750&view=rev
Log:
MAPREDUCE-372. Change org.apache.hadoop.mapred.lib.ChainMapper/Reducer to use new mapreduce api. Contributed by Amareshwari Sriramadasu.

Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/Chain.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapper.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReducer.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestChainErrors.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestMapReduceChain.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestSingleElementChain.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/Chain.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/ChainMapper.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/ChainReducer.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=896750&r1=896749&r2=896750&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Jan  7 04:06:55 2010
@@ -97,6 +97,9 @@
     been created by user tasks with non-writable permissions.
     (Ravi Gummadi via yhemanth)
 
+    MAPREDUCE-372. Change org.apache.hadoop.mapred.lib.ChainMapper/Reducer 
+    to use new mapreduce api. (Amareshwari Sriramadasu via sharad)
+
   OPTIMIZATIONS
 
     MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/Chain.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/Chain.java?rev=896750&r1=896749&r2=896750&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/Chain.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/Chain.java Thu Jan  7 04:06:55 2010
@@ -18,8 +18,6 @@
 package org.apache.hadoop.mapred.lib;
 
 import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.Stringifier;
-import org.apache.hadoop.io.DefaultStringifier;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.Serialization;
 import org.apache.hadoop.io.serializer.SerializationFactory;
@@ -32,45 +30,19 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
 
 /**
  * The Chain class provides all the common functionality for the
  * {@link ChainMapper} and the {@link ChainReducer} classes.
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.lib.chain.Chain} instead
  */
-class Chain {
-  private static final String CHAIN_MAPPER = "chain.mapper";
-  private static final String CHAIN_REDUCER = "chain.reducer";
-
-  private static final String CHAIN_MAPPER_SIZE = ".size";
-  private static final String CHAIN_MAPPER_CLASS = ".mapper.class.";
-  private static final String CHAIN_MAPPER_CONFIG = ".mapper.config.";
-  private static final String CHAIN_REDUCER_CLASS = ".reducer.class";
-  private static final String CHAIN_REDUCER_CONFIG = ".reducer.config";
+@Deprecated
+class Chain extends org.apache.hadoop.mapreduce.lib.chain.Chain {
 
   private static final String MAPPER_BY_VALUE = "chain.mapper.byValue";
   private static final String REDUCER_BY_VALUE = "chain.reducer.byValue";
 
-  private static final String MAPPER_INPUT_KEY_CLASS =
-    "chain.mapper.input.key.class";
-  private static final String MAPPER_INPUT_VALUE_CLASS =
-    "chain.mapper.input.value.class";
-  private static final String MAPPER_OUTPUT_KEY_CLASS =
-    "chain.mapper.output.key.class";
-  private static final String MAPPER_OUTPUT_VALUE_CLASS =
-    "chain.mapper.output.value.class";
-  private static final String REDUCER_INPUT_KEY_CLASS =
-    "chain.reducer.input.key.class";
-  private static final String REDUCER_INPUT_VALUE_CLASS =
-    "chain.reducer.input.value.class";
-  private static final String REDUCER_OUTPUT_KEY_CLASS =
-    "chain.reducer.output.key.class";
-  private static final String REDUCER_OUTPUT_VALUE_CLASS =
-    "chain.reducer.output.value.class";
-
-  private boolean isMap;
-
   private JobConf chainJobConf;
 
   private List<Mapper> mappers = new ArrayList<Mapper>();
@@ -92,51 +64,7 @@
    *              Reducer.
    */
   Chain(boolean isMap) {
-    this.isMap = isMap;
-  }
-
-  /**
-   * Returns the prefix to use for the configuration of the chain depending
-   * if it is for a Mapper or a Reducer.
-   *
-   * @param isMap TRUE for Mapper, FALSE for Reducer.
-   * @return the prefix to use.
-   */
-  private static String getPrefix(boolean isMap) {
-    return (isMap) ? CHAIN_MAPPER : CHAIN_REDUCER;
-  }
-
-  /**
-   * Creates a {@link JobConf} for one of the Maps or Reduce in the chain.
-   * <p/>
-   * It creates a new JobConf using the chain job's JobConf as base and adds to
-   * it the configuration properties for the chain element. The keys of the
-   * chain element jobConf have precedence over the given JobConf.
-   *
-   * @param jobConf the chain job's JobConf.
-   * @param confKey the key for chain element configuration serialized in the
-   *                chain job's JobConf.
-   * @return a new JobConf aggregating the chain job's JobConf with the chain
-   *         element configuration properties.
-   */
-  private static JobConf getChainElementConf(JobConf jobConf, String confKey) {
-    JobConf conf;
-    try {
-      Stringifier<JobConf> stringifier =
-        new DefaultStringifier<JobConf>(jobConf, JobConf.class);
-      conf = stringifier.fromString(jobConf.get(confKey, null));
-    } catch (IOException ioex) {
-      throw new RuntimeException(ioex);
-    }
-    // we have to do this because the Writable desearialization clears all
-    // values set in the conf making not possible do do a new JobConf(jobConf)
-    // in the creation of the conf above
-    jobConf = new JobConf(jobConf);
-
-    for(Map.Entry<String, String> entry : conf) {
-      jobConf.set(entry.getKey(), entry.getValue());
-    }
-    return jobConf;
+    super(isMap);
   }
 
   /**
@@ -169,82 +97,27 @@
     String prefix = getPrefix(isMap);
 
     // if a reducer chain check the Reducer has been already set
-    if (!isMap) {
-      if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS,
-                           Reducer.class) == null) {
-        throw new IllegalStateException(
-          "A Mapper can be added to the chain only after the Reducer has " +
-          "been set");
-      }
-    }
-    int index = jobConf.getInt(prefix + CHAIN_MAPPER_SIZE, 0);
+    checkReducerAlreadySet(isMap, jobConf, prefix, true);
+	    
+    // set the mapper class
+    int index = getIndex(jobConf, prefix);
     jobConf.setClass(prefix + CHAIN_MAPPER_CLASS + index, klass, Mapper.class);
-
-    // if it is a reducer chain and the first Mapper is being added check the
-    // key and value input classes of the mapper match those of the reducer
-    // output.
-    if (!isMap && index == 0) {
-      JobConf reducerConf =
-        getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG);
-      if (! inputKeyClass.isAssignableFrom(
-        reducerConf.getClass(REDUCER_OUTPUT_KEY_CLASS, null))) {
-        throw new IllegalArgumentException("The Reducer output key class does" +
-          " not match the Mapper input key class");
-      }
-      if (! inputValueClass.isAssignableFrom(
-        reducerConf.getClass(REDUCER_OUTPUT_VALUE_CLASS, null))) {
-        throw new IllegalArgumentException("The Reducer output value class" +
-          " does not match the Mapper input value class");
-      }
-    } else if (index > 0) {
-      // check the that the new Mapper in the chain key and value input classes
-      // match those of the previous Mapper output.
-      JobConf previousMapperConf =
-        getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG +
-          (index - 1));
-      if (! inputKeyClass.isAssignableFrom(
-        previousMapperConf.getClass(MAPPER_OUTPUT_KEY_CLASS, null))) {
-        throw new IllegalArgumentException("The Mapper output key class does" +
-          " not match the previous Mapper input key class");
-      }
-      if (! inputValueClass.isAssignableFrom(
-        previousMapperConf.getClass(MAPPER_OUTPUT_VALUE_CLASS, null))) {
-        throw new IllegalArgumentException("The Mapper output value class" +
-          " does not match the previous Mapper input value class");
-      }
-    }
-
+	    
+    validateKeyValueTypes(isMap, jobConf, inputKeyClass, inputValueClass,
+      outputKeyClass, outputValueClass, index, prefix);
+	    
     // if the Mapper does not have a private JobConf create an empty one
     if (mapperConf == null) {
-      // using a JobConf without defaults to make it lightweight.
-      // still the chain JobConf may have all defaults and this conf is
-      // overlapped to the chain JobConf one.
+    // using a JobConf without defaults to make it lightweight.
+    // still the chain JobConf may have all defaults and this conf is
+    // overlapped to the chain JobConf one.
       mapperConf = new JobConf(true);
     }
-
-    // store in the private mapper conf the input/output classes of the mapper
-    // and if it works by value or by reference
+    // store in the private mapper conf if it works by value or by reference
     mapperConf.setBoolean(MAPPER_BY_VALUE, byValue);
-    mapperConf.setClass(MAPPER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
-    mapperConf.setClass(MAPPER_INPUT_VALUE_CLASS, inputValueClass,
-                        Object.class);
-    mapperConf.setClass(MAPPER_OUTPUT_KEY_CLASS, outputKeyClass, Object.class);
-    mapperConf.setClass(MAPPER_OUTPUT_VALUE_CLASS, outputValueClass,
-                        Object.class);
-
-    // serialize the private mapper jobconf in the chain jobconf.
-    Stringifier<JobConf> stringifier =
-      new DefaultStringifier<JobConf>(jobConf, JobConf.class);
-    try {
-      jobConf.set(prefix + CHAIN_MAPPER_CONFIG + index,
-                  stringifier.toString(new JobConf(mapperConf)));
-    }
-    catch (IOException ioEx) {
-      throw new RuntimeException(ioEx);
-    }
-
-    // increment the chain counter
-    jobConf.setInt(prefix + CHAIN_MAPPER_SIZE, index + 1);
+    
+    setMapperConf(isMap, jobConf, inputKeyClass, inputValueClass,
+	      outputKeyClass, outputValueClass, mapperConf, index, prefix);
   }
 
   /**
@@ -273,13 +146,10 @@
                           Class<? extends V2> outputValueClass,
                           boolean byValue, JobConf reducerConf) {
     String prefix = getPrefix(false);
-
-    if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null) != null) {
-      throw new IllegalStateException("Reducer has been already set");
-    }
+    checkReducerAlreadySet(false, jobConf, prefix, false);
 
     jobConf.setClass(prefix + CHAIN_REDUCER_CLASS, klass, Reducer.class);
-
+    
     // if the Reducer does not have a private JobConf create an empty one
     if (reducerConf == null) {
       // using a JobConf without defaults to make it lightweight.
@@ -291,24 +161,9 @@
     // store in the private reducer conf the input/output classes of the reducer
     // and if it works by value or by reference
     reducerConf.setBoolean(MAPPER_BY_VALUE, byValue);
-    reducerConf.setClass(REDUCER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
-    reducerConf.setClass(REDUCER_INPUT_VALUE_CLASS, inputValueClass,
-                         Object.class);
-    reducerConf.setClass(REDUCER_OUTPUT_KEY_CLASS, outputKeyClass,
-                         Object.class);
-    reducerConf.setClass(REDUCER_OUTPUT_VALUE_CLASS, outputValueClass,
-                         Object.class);
-
-    // serialize the private mapper jobconf in the chain jobconf.
-    Stringifier<JobConf> stringifier =
-      new DefaultStringifier<JobConf>(jobConf, JobConf.class);
-    try {
-      jobConf.set(prefix + CHAIN_REDUCER_CONFIG,
-                  stringifier.toString(new JobConf(reducerConf)));
-    }
-    catch (IOException ioEx) {
-      throw new RuntimeException(ioEx);
-    }
+
+    setReducerConf(jobConf, inputKeyClass, inputValueClass, outputKeyClass,
+      outputValueClass, reducerConf, prefix);
   }
 
   /**
@@ -325,8 +180,8 @@
     for (int i = 0; i < index; i++) {
       Class<? extends Mapper> klass =
         jobConf.getClass(prefix + CHAIN_MAPPER_CLASS + i, null, Mapper.class);
-      JobConf mConf =
-        getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG + i);
+      JobConf mConf = new JobConf(
+        getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG + i));
       Mapper mapper = ReflectionUtils.newInstance(klass, mConf);
       mappers.add(mapper);
 
@@ -343,8 +198,8 @@
     Class<? extends Reducer> klass =
       jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null, Reducer.class);
     if (klass != null) {
-      JobConf rConf =
-        getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG);
+      JobConf rConf = new JobConf(
+        getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG));
       reducer = ReflectionUtils.newInstance(klass, rConf);
       if (rConf.getBoolean(REDUCER_BY_VALUE, true)) {
         reducerKeySerialization = serializationFactory

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/ChainMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/ChainMapper.java?rev=896750&r1=896749&r2=896750&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/ChainMapper.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/ChainMapper.java Thu Jan  7 04:06:55 2010
@@ -86,7 +86,10 @@
  * RunningJob job = jc.submitJob(conf);
  * ...
  * </pre>
+ * @deprecated 
+ * Use {@link org.apache.hadoop.mapreduce.lib.chain.ChainMapper} instead
  */
+@Deprecated
 public class ChainMapper implements Mapper {
 
   /**

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/ChainReducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/ChainReducer.java?rev=896750&r1=896749&r2=896750&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/ChainReducer.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/ChainReducer.java Thu Jan  7 04:06:55 2010
@@ -86,7 +86,10 @@
  * RunningJob job = jc.submitJob(conf);
  * ...
  * </pre>
+ * @deprecated 
+ * Use {@link org.apache.hadoop.mapreduce.lib.chain.ChainReducer} instead
  */
+@Deprecated
 public class ChainReducer implements Reducer {
 
   /**

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/Chain.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/Chain.java?rev=896750&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/Chain.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/Chain.java Thu Jan  7 04:06:55 2010
@@ -0,0 +1,903 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.chain;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.Stringifier;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.ReduceContext;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
+import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * The Chain class provides all the common functionality for the
+ * {@link ChainMapper} and the {@link ChainReducer} classes.
+ */
+public class Chain {
+  protected static final String CHAIN_MAPPER = "mapreduce.chain.mapper";
+  protected static final String CHAIN_REDUCER = "mapreduce.chain.reducer";
+
+  protected static final String CHAIN_MAPPER_SIZE = ".size";
+  protected static final String CHAIN_MAPPER_CLASS = ".mapper.class.";
+  protected static final String CHAIN_MAPPER_CONFIG = ".mapper.config.";
+  protected static final String CHAIN_REDUCER_CLASS = ".reducer.class";
+  protected static final String CHAIN_REDUCER_CONFIG = ".reducer.config";
+
+  protected static final String MAPPER_INPUT_KEY_CLASS = 
+    "mapreduce.chain.mapper.input.key.class";
+  protected static final String MAPPER_INPUT_VALUE_CLASS = 
+    "mapreduce.chain.mapper.input.value.class";
+  protected static final String MAPPER_OUTPUT_KEY_CLASS = 
+    "mapreduce.chain.mapper.output.key.class";
+  protected static final String MAPPER_OUTPUT_VALUE_CLASS = 
+    "mapreduce.chain.mapper.output.value.class";
+  protected static final String REDUCER_INPUT_KEY_CLASS = 
+    "mapreduce.chain.reducer.input.key.class";
+  protected static final String REDUCER_INPUT_VALUE_CLASS = 
+    "maperduce.chain.reducer.input.value.class";
+  protected static final String REDUCER_OUTPUT_KEY_CLASS = 
+    "mapreduce.chain.reducer.output.key.class";
+  protected static final String REDUCER_OUTPUT_VALUE_CLASS = 
+    "mapreduce.chain.reducer.output.value.class";
+
+  protected boolean isMap;
+
+  @SuppressWarnings("unchecked")
+  private List<Mapper> mappers = new ArrayList<Mapper>();
+  private Reducer<?, ?, ?, ?> reducer;
+  private List<Configuration> confList = new ArrayList<Configuration>();
+  private Configuration rConf;
+  private List<Thread> threads = new ArrayList<Thread>();
+  private List<ChainBlockingQueue<?>> blockingQueues = 
+    new ArrayList<ChainBlockingQueue<?>>();
+  private Throwable throwable = null;
+
+  /**
+   * Creates a Chain instance configured for a Mapper or a Reducer.
+   * 
+   * @param isMap
+   *          TRUE indicates the chain is for a Mapper, FALSE that is for a
+   *          Reducer.
+   */
+  protected Chain(boolean isMap) {
+    this.isMap = isMap;
+  }
+
+  static class KeyValuePair<K, V> {
+    K key;
+    V value;
+    boolean endOfInput;
+
+    KeyValuePair(K key, V value) {
+      this.key = key;
+      this.value = value;
+      this.endOfInput = false;
+    }
+
+    KeyValuePair(boolean eof) {
+      this.key = null;
+      this.value = null;
+      this.endOfInput = eof;
+    }
+  }
+
+  // ChainRecordReader either reads from blocking queue or task context.
+  private static class ChainRecordReader<KEYIN, VALUEIN> extends
+      RecordReader<KEYIN, VALUEIN> {
+    private Class<?> keyClass;
+    private Class<?> valueClass;
+    private KEYIN key;
+    private VALUEIN value;
+    private Configuration conf;
+    TaskInputOutputContext<KEYIN, VALUEIN, ?, ?> inputContext = null;
+    ChainBlockingQueue<KeyValuePair<KEYIN, VALUEIN>> inputQueue = null;
+
+    // constructor to read from a blocking queue
+    ChainRecordReader(Class<?> keyClass, Class<?> valueClass,
+        ChainBlockingQueue<KeyValuePair<KEYIN, VALUEIN>> inputQueue,
+        Configuration conf) {
+      this.keyClass = keyClass;
+      this.valueClass = valueClass;
+      this.inputQueue = inputQueue;
+      this.conf = conf;
+    }
+
+    // constructor to read from the context
+    ChainRecordReader(TaskInputOutputContext<KEYIN, VALUEIN, ?, ?> context) {
+      inputContext = context;
+    }
+
+    public void initialize(InputSplit split, TaskAttemptContext context)
+        throws IOException, InterruptedException {
+    }
+
+    /**
+     * Advance to the next key, value pair, returning null if at end.
+     * 
+     * @return the key object that was read into, or null if no more
+     */
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      if (inputQueue != null) {
+        return readFromQueue();
+      } else if (inputContext.nextKeyValue()) {
+        this.key = inputContext.getCurrentKey();
+        this.value = inputContext.getCurrentValue();
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    private boolean readFromQueue() throws IOException, InterruptedException {
+      KeyValuePair<KEYIN, VALUEIN> kv = null;
+
+      // wait for input on queue
+      kv = inputQueue.dequeue();
+      if (kv.endOfInput) {
+        return false;
+      }
+      key = (KEYIN) ReflectionUtils.newInstance(keyClass, conf);
+      value = (VALUEIN) ReflectionUtils.newInstance(valueClass, conf);
+      ReflectionUtils.copy(conf, kv.key, this.key);
+      ReflectionUtils.copy(conf, kv.value, this.value);
+      return true;
+    }
+
+    /**
+     * Get the current key.
+     * 
+     * @return the current key object or null if there isn't one
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public KEYIN getCurrentKey() throws IOException, InterruptedException {
+      return this.key;
+    }
+
+    /**
+     * Get the current value.
+     * 
+     * @return the value object that was read into
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public VALUEIN getCurrentValue() throws IOException, InterruptedException {
+      return this.value;
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      return 0;
+    }
+  }
+
+  // ChainRecordWriter either writes to blocking queue or task context
+
+  private static class ChainRecordWriter<KEYOUT, VALUEOUT> extends
+      RecordWriter<KEYOUT, VALUEOUT> {
+    TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> outputContext = null;
+    ChainBlockingQueue<KeyValuePair<KEYOUT, VALUEOUT>> outputQueue = null;
+    KEYOUT keyout;
+    VALUEOUT valueout;
+    Configuration conf;
+    Class<?> keyClass;
+    Class<?> valueClass;
+
+    // constructor to write to context
+    ChainRecordWriter(TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) {
+      outputContext = context;
+    }
+
+    // constructor to write to blocking queue
+    ChainRecordWriter(Class<?> keyClass, Class<?> valueClass,
+        ChainBlockingQueue<KeyValuePair<KEYOUT, VALUEOUT>> output,
+        Configuration conf) {
+      this.keyClass = keyClass;
+      this.valueClass = valueClass;
+      this.outputQueue = output;
+      this.conf = conf;
+    }
+
+    /**
+     * Writes a key/value pair.
+     * 
+     * @param key
+     *          the key to write.
+     * @param value
+     *          the value to write.
+     * @throws IOException
+     */
+    public void write(KEYOUT key, VALUEOUT value) throws IOException,
+        InterruptedException {
+      if (outputQueue != null) {
+        writeToQueue(key, value);
+      } else {
+        outputContext.write(key, value);
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void writeToQueue(KEYOUT key, VALUEOUT value) throws IOException,
+        InterruptedException {
+      this.keyout = (KEYOUT) ReflectionUtils.newInstance(keyClass, conf);
+      this.valueout = (VALUEOUT) ReflectionUtils.newInstance(valueClass, conf);
+      ReflectionUtils.copy(conf, key, this.keyout);
+      ReflectionUtils.copy(conf, value, this.valueout);
+
+      // wait to write output to queuue
+      outputQueue.enqueue(new KeyValuePair<KEYOUT, VALUEOUT>(keyout, valueout));
+    }
+
+    /**
+     * Close this <code>RecordWriter</code> to future operations.
+     * 
+     * @param context
+     *          the context of the task
+     * @throws IOException
+     */
+    public void close(TaskAttemptContext context) throws IOException,
+        InterruptedException {
+      if (outputQueue != null) {
+        // write end of input
+        outputQueue.enqueue(new KeyValuePair<KEYOUT, VALUEOUT>(true));
+      }
+    }
+
+  }
+
+  private synchronized Throwable getThrowable() {
+    return throwable;
+  }
+
+  private synchronized boolean setIfUnsetThrowable(Throwable th) {
+    if (throwable == null) {
+      throwable = th;
+      return true;
+    }
+    return false;
+  }
+
+  private class MapRunner<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Thread {
+    private Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapper;
+    private Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context chainContext;
+    private RecordReader<KEYIN, VALUEIN> rr;
+    private RecordWriter<KEYOUT, VALUEOUT> rw;
+
+    public MapRunner(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapper,
+        Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context mapperContext,
+        RecordReader<KEYIN, VALUEIN> rr, RecordWriter<KEYOUT, VALUEOUT> rw)
+        throws IOException, InterruptedException {
+      this.mapper = mapper;
+      this.rr = rr;
+      this.rw = rw;
+      this.chainContext = mapperContext;
+    }
+
+    @Override
+    public void run() {
+      if (getThrowable() != null) {
+        return;
+      }
+      try {
+        mapper.run(chainContext);
+        rr.close();
+        rw.close(chainContext);
+      } catch (Throwable th) {
+        if (setIfUnsetThrowable(th)) {
+          interruptAllThreads();
+        }
+      }
+    }
+  }
+
+  private class ReduceRunner<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Thread {
+    private Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reducer;
+    private Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context chainContext;
+    private RecordWriter<KEYOUT, VALUEOUT> rw;
+
+    ReduceRunner(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context,
+        Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reducer,
+        RecordWriter<KEYOUT, VALUEOUT> rw) throws IOException,
+        InterruptedException {
+      this.reducer = reducer;
+      this.chainContext = context;
+      this.rw = rw;
+    }
+
+    @Override
+    public void run() {
+      try {
+        reducer.run(chainContext);
+        rw.close(chainContext);
+      } catch (Throwable th) {
+        if (setIfUnsetThrowable(th)) {
+          interruptAllThreads();
+        }
+      }
+    }
+  }
+
+  Configuration getConf(int index) {
+    return confList.get(index);
+  }
+
+  /**
+   * Create a map context that is based on ChainMapContext and the given record
+   * reader and record writer
+   */
+  private <KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
+  Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createMapContext(
+      RecordReader<KEYIN, VALUEIN> rr, RecordWriter<KEYOUT, VALUEOUT> rw,
+      TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context,
+      Configuration conf) {
+    MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext = 
+      new ChainMapContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(
+        context, rr, rw, conf);
+    Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context mapperContext = 
+      new WrappedMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()
+        .getMapContext(mapContext);
+    return mapperContext;
+  }
+
+  @SuppressWarnings("unchecked")
+  void runMapper(TaskInputOutputContext context, int index) throws IOException,
+      InterruptedException {
+    Mapper mapper = mappers.get(index);
+    RecordReader rr = new ChainRecordReader(context);
+    RecordWriter rw = new ChainRecordWriter(context);
+    Mapper.Context mapperContext = createMapContext(rr, rw, context,
+        getConf(index));
+    mapper.run(mapperContext);
+    rr.close();
+    rw.close(context);
+  }
+
+  /**
+   * Add mapper(the first mapper) that reads input from the input
+   * context and writes to queue
+   */
+  @SuppressWarnings("unchecked")
+  void addMapper(TaskInputOutputContext inputContext,
+      ChainBlockingQueue<KeyValuePair<?, ?>> output, int index)
+      throws IOException, InterruptedException {
+    Configuration conf = getConf(index);
+    Class<?> keyOutClass = conf.getClass(MAPPER_OUTPUT_KEY_CLASS, Object.class);
+    Class<?> valueOutClass = conf.getClass(MAPPER_OUTPUT_VALUE_CLASS,
+        Object.class);
+
+    RecordReader rr = new ChainRecordReader(inputContext);
+    RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass, output,
+        conf);
+    Mapper.Context mapperContext = createMapContext(rr, rw,
+        (MapContext) inputContext, getConf(index));
+    MapRunner runner = new MapRunner(mappers.get(index), mapperContext, rr, rw);
+    threads.add(runner);
+  }
+
+  /**
+   * Add mapper(the last mapper) that reads input from
+   * queue and writes output to the output context
+   */
+  @SuppressWarnings("unchecked")
+  void addMapper(ChainBlockingQueue<KeyValuePair<?, ?>> input,
+      TaskInputOutputContext outputContext, int index) throws IOException,
+      InterruptedException {
+    Configuration conf = getConf(index);
+    Class<?> keyClass = conf.getClass(MAPPER_INPUT_KEY_CLASS, Object.class);
+    Class<?> valueClass = conf.getClass(MAPPER_INPUT_VALUE_CLASS, Object.class);
+    RecordReader rr = new ChainRecordReader(keyClass, valueClass, input, conf);
+    RecordWriter rw = new ChainRecordWriter(outputContext);
+    MapRunner runner = new MapRunner(mappers.get(index), createMapContext(rr,
+        rw, outputContext, getConf(index)), rr, rw);
+    threads.add(runner);
+  }
+
+  /**
+   * Add mapper that reads and writes from/to the queue
+   */
+  @SuppressWarnings("unchecked")
+  void addMapper(ChainBlockingQueue<KeyValuePair<?, ?>> input,
+      ChainBlockingQueue<KeyValuePair<?, ?>> output,
+      TaskInputOutputContext context, int index) throws IOException,
+      InterruptedException {
+    Configuration conf = getConf(index);
+    Class<?> keyClass = conf.getClass(MAPPER_INPUT_KEY_CLASS, Object.class);
+    Class<?> valueClass = conf.getClass(MAPPER_INPUT_VALUE_CLASS, Object.class);
+    Class<?> keyOutClass = conf.getClass(MAPPER_OUTPUT_KEY_CLASS, Object.class);
+    Class<?> valueOutClass = conf.getClass(MAPPER_OUTPUT_VALUE_CLASS,
+        Object.class);
+    RecordReader rr = new ChainRecordReader(keyClass, valueClass, input, conf);
+    RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass, output,
+        conf);
+    MapRunner runner = new MapRunner(mappers.get(index), createMapContext(rr,
+        rw, context, getConf(index)), rr, rw);
+    threads.add(runner);
+  }
+
+  /**
+   * Create a reduce context that is based on ChainMapContext and the given
+   * record writer
+   */
+  private <KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
+  Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createReduceContext(
+      RecordWriter<KEYOUT, VALUEOUT> rw,
+      ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context,
+      Configuration conf) {
+    ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext = 
+      new ChainReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(
+          context, rw, conf);
+    Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context reducerContext = 
+      new WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()
+        .getReducerContext(reduceContext);
+    return reducerContext;
+  }
+
+  // Run the reducer directly.
+  @SuppressWarnings("unchecked")
+  <KEYIN, VALUEIN, KEYOUT, VALUEOUT> void runReducer(
+      TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context)
+      throws IOException, InterruptedException {
+    RecordWriter<KEYOUT, VALUEOUT> rw = new ChainRecordWriter<KEYOUT, VALUEOUT>(
+        context);
+    Reducer.Context reducerContext = createReduceContext(rw,
+        (ReduceContext) context, rConf);
+    reducer.run(reducerContext);
+    rw.close(context);
+  }
+
+  /**
+   * Add reducer that reads from context and writes to a queue
+   */
+  @SuppressWarnings("unchecked")
+  void addReducer(TaskInputOutputContext inputContext,
+      ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException,
+      InterruptedException {
+
+    Class<?> keyOutClass = rConf.getClass(REDUCER_OUTPUT_KEY_CLASS,
+        Object.class);
+    Class<?> valueOutClass = rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS,
+        Object.class);
+    RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass,
+        outputQueue, rConf);
+    Reducer.Context reducerContext = createReduceContext(rw,
+        (ReduceContext) inputContext, rConf);
+    ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw);
+    threads.add(runner);
+  }
+
+  // start all the threads
+  void startAllThreads() {
+    for (Thread thread : threads) {
+      thread.start();
+    }
+  }
+  
+  // wait till all threads finish
+  void joinAllThreads() throws IOException, InterruptedException {
+    for (Thread thread : threads) {
+      thread.join();
+    }
+    Throwable th = getThrowable();
+    if (th != null) {
+      if (th instanceof IOException) {
+        throw (IOException) th;
+      } else if (th instanceof InterruptedException) {
+        throw (InterruptedException) th;
+      } else {
+        throw new RuntimeException(th);
+      }
+    }
+  }
+
+  // interrupt all threads
+  private synchronized void interruptAllThreads() {
+    for (Thread th : threads) {
+      th.interrupt();
+    }
+    for (ChainBlockingQueue<?> queue : blockingQueues) {
+      queue.interrupt();
+    }
+  }
+
+  /**
+   * Returns the prefix to use for the configuration of the chain depending if
+   * it is for a Mapper or a Reducer.
+   * 
+   * @param isMap
+   *          TRUE for Mapper, FALSE for Reducer.
+   * @return the prefix to use.
+   */
+  protected static String getPrefix(boolean isMap) {
+    return (isMap) ? CHAIN_MAPPER : CHAIN_REDUCER;
+  }
+
+  protected static int getIndex(Configuration conf, String prefix) {
+    return conf.getInt(prefix + CHAIN_MAPPER_SIZE, 0);
+  }
+
+  /**
+   * Creates a {@link Configuration} for the Map or Reduce in the chain.
+   * 
+   * <p>
+   * It creates a new Configuration using the chain job's Configuration as base
+   * and adds to it the configuration properties for the chain element. The keys
+   * of the chain element Configuration have precedence over the given
+   * Configuration.
+   * </p>
+   * 
+   * @param jobConf
+   *          the chain job's Configuration.
+   * @param confKey
+   *          the key for chain element configuration serialized in the chain
+   *          job's Configuration.
+   * @return a new Configuration aggregating the chain job's Configuration with
+   *         the chain element configuration properties.
+   */
+  protected static Configuration getChainElementConf(Configuration jobConf,
+      String confKey) {
+    Configuration conf = null;
+    try {
+      Stringifier<Configuration> stringifier = 
+        new DefaultStringifier<Configuration>(jobConf, Configuration.class);
+      String confString = jobConf.get(confKey, null);
+      if (confString != null) {
+        conf = stringifier.fromString(jobConf.get(confKey, null));
+      }
+    } catch (IOException ioex) {
+      throw new RuntimeException(ioex);
+    }
+    // we have to do this because the Writable desearialization clears all
+    // values set in the conf making not possible do a
+    // new Configuration(jobConf) in the creation of the conf above
+    jobConf = new Configuration(jobConf);
+
+    if (conf != null) {
+      for (Map.Entry<String, String> entry : conf) {
+        jobConf.set(entry.getKey(), entry.getValue());
+      }
+    }
+    return jobConf;
+  }
+
+  /**
+   * Adds a Mapper class to the chain job.
+   * 
+   * <p/>
+   * The configuration properties of the chain job have precedence over the
+   * configuration properties of the Mapper.
+   * 
+   * @param isMap
+   *          indicates if the Chain is for a Mapper or for a Reducer.
+   * @param job
+   *          chain job.
+   * @param klass
+   *          the Mapper class to add.
+   * @param inputKeyClass
+   *          mapper input key class.
+   * @param inputValueClass
+   *          mapper input value class.
+   * @param outputKeyClass
+   *          mapper output key class.
+   * @param outputValueClass
+   *          mapper output value class.
+   * @param mapperConf
+   *          a configuration for the Mapper class. It is recommended to use a
+   *          Configuration without default values using the
+   *          <code>Configuration(boolean loadDefaults)</code> constructor with
+   *          FALSE.
+   */
+  @SuppressWarnings("unchecked")
+  protected static void addMapper(boolean isMap, Job job,
+      Class<? extends Mapper> klass, Class<?> inputKeyClass,
+      Class<?> inputValueClass, Class<?> outputKeyClass,
+      Class<?> outputValueClass, Configuration mapperConf) {
+    String prefix = getPrefix(isMap);
+    Configuration jobConf = job.getConfiguration();
+
+    // if a reducer chain check the Reducer has been already set
+    checkReducerAlreadySet(isMap, jobConf, prefix, true);
+
+    // set the mapper class
+    int index = getIndex(jobConf, prefix);
+    jobConf.setClass(prefix + CHAIN_MAPPER_CLASS + index, klass, Mapper.class);
+
+    validateKeyValueTypes(isMap, jobConf, inputKeyClass, inputValueClass,
+        outputKeyClass, outputValueClass, index, prefix);
+
+    setMapperConf(isMap, jobConf, inputKeyClass, inputValueClass,
+        outputKeyClass, outputValueClass, mapperConf, index, prefix);
+  }
+
+  // if a reducer chain check the Reducer has been already set or not
+  protected static void checkReducerAlreadySet(boolean isMap,
+      Configuration jobConf, String prefix, boolean shouldSet) {
+    if (!isMap) {
+      if (shouldSet) {
+        if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null) == null) {
+          throw new IllegalStateException(
+              "A Mapper can be added to the chain only after the Reducer has "
+                  + "been set");
+        }
+      } else {
+        if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null) != null) {
+          throw new IllegalStateException("Reducer has been already set");
+        }
+      }
+    }
+  }
+
+  protected static void validateKeyValueTypes(boolean isMap,
+      Configuration jobConf, Class<?> inputKeyClass, Class<?> inputValueClass,
+      Class<?> outputKeyClass, Class<?> outputValueClass, int index,
+      String prefix) {
+    // if it is a reducer chain and the first Mapper is being added check the
+    // key and value input classes of the mapper match those of the reducer
+    // output.
+    if (!isMap && index == 0) {
+      Configuration reducerConf = getChainElementConf(jobConf, prefix
+          + CHAIN_REDUCER_CONFIG);
+      if (!inputKeyClass.isAssignableFrom(reducerConf.getClass(
+          REDUCER_OUTPUT_KEY_CLASS, null))) {
+        throw new IllegalArgumentException("The Reducer output key class does"
+            + " not match the Mapper input key class");
+      }
+      if (!inputValueClass.isAssignableFrom(reducerConf.getClass(
+          REDUCER_OUTPUT_VALUE_CLASS, null))) {
+        throw new IllegalArgumentException("The Reducer output value class"
+            + " does not match the Mapper input value class");
+      }
+    } else if (index > 0) {
+      // check the that the new Mapper in the chain key and value input classes
+      // match those of the previous Mapper output.
+      Configuration previousMapperConf = getChainElementConf(jobConf, prefix
+          + CHAIN_MAPPER_CONFIG + (index - 1));
+      if (!inputKeyClass.isAssignableFrom(previousMapperConf.getClass(
+          MAPPER_OUTPUT_KEY_CLASS, null))) {
+        throw new IllegalArgumentException("The Mapper output key class does"
+            + " not match the previous Mapper input key class");
+      }
+      if (!inputValueClass.isAssignableFrom(previousMapperConf.getClass(
+          MAPPER_OUTPUT_VALUE_CLASS, null))) {
+        throw new IllegalArgumentException("The Mapper output value class"
+            + " does not match the previous Mapper input value class");
+      }
+    }
+  }
+
+  protected static void setMapperConf(boolean isMap, Configuration jobConf,
+      Class<?> inputKeyClass, Class<?> inputValueClass,
+      Class<?> outputKeyClass, Class<?> outputValueClass,
+      Configuration mapperConf, int index, String prefix) {
+    // if the Mapper does not have a configuration, create an empty one
+    if (mapperConf == null) {
+      // using a Configuration without defaults to make it lightweight.
+      // still the chain's conf may have all defaults and this conf is
+      // overlapped to the chain configuration one.
+      mapperConf = new Configuration(true);
+    }
+
+    // store the input/output classes of the mapper in the mapper conf
+    mapperConf.setClass(MAPPER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
+    mapperConf
+        .setClass(MAPPER_INPUT_VALUE_CLASS, inputValueClass, Object.class);
+    mapperConf.setClass(MAPPER_OUTPUT_KEY_CLASS, outputKeyClass, Object.class);
+    mapperConf.setClass(MAPPER_OUTPUT_VALUE_CLASS, outputValueClass,
+        Object.class);
+    // serialize the mapper configuration in the chain configuration.
+    Stringifier<Configuration> stringifier = 
+      new DefaultStringifier<Configuration>(jobConf, Configuration.class);
+    try {
+      jobConf.set(prefix + CHAIN_MAPPER_CONFIG + index, stringifier
+          .toString(new Configuration(mapperConf)));
+    } catch (IOException ioEx) {
+      throw new RuntimeException(ioEx);
+    }
+
+    // increment the chain counter
+    jobConf.setInt(prefix + CHAIN_MAPPER_SIZE, index + 1);
+  }
+
+  /**
+   * Sets the Reducer class to the chain job.
+   * 
+   * <p/>
+   * The configuration properties of the chain job have precedence over the
+   * configuration properties of the Reducer.
+   * 
+   * @param job
+   *          the chain job.
+   * @param klass
+   *          the Reducer class to add.
+   * @param inputKeyClass
+   *          reducer input key class.
+   * @param inputValueClass
+   *          reducer input value class.
+   * @param outputKeyClass
+   *          reducer output key class.
+   * @param outputValueClass
+   *          reducer output value class.
+   * @param reducerConf
+   *          a configuration for the Reducer class. It is recommended to use a
+   *          Configuration without default values using the
+   *          <code>Configuration(boolean loadDefaults)</code> constructor with
+   *          FALSE.
+   */
+  @SuppressWarnings("unchecked")
+  protected static void setReducer(Job job, Class<? extends Reducer> klass,
+      Class<?> inputKeyClass, Class<?> inputValueClass,
+      Class<?> outputKeyClass, Class<?> outputValueClass,
+      Configuration reducerConf) {
+    String prefix = getPrefix(false);
+    Configuration jobConf = job.getConfiguration();
+    checkReducerAlreadySet(false, jobConf, prefix, false);
+
+    jobConf.setClass(prefix + CHAIN_REDUCER_CLASS, klass, Reducer.class);
+
+    setReducerConf(jobConf, inputKeyClass, inputValueClass, outputKeyClass,
+        outputValueClass, reducerConf, prefix);
+  }
+
+  protected static void setReducerConf(Configuration jobConf,
+      Class<?> inputKeyClass, Class<?> inputValueClass,
+      Class<?> outputKeyClass, Class<?> outputValueClass,
+      Configuration reducerConf, String prefix) {
+    // if the Reducer does not have a Configuration, create an empty one
+    if (reducerConf == null) {
+      // using a Configuration without defaults to make it lightweight.
+      // still the chain's conf may have all defaults and this conf is
+      // overlapped to the chain's Configuration one.
+      reducerConf = new Configuration(false);
+    }
+
+    // store the input/output classes of the reducer in
+    // the reducer configuration
+    reducerConf.setClass(REDUCER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
+    reducerConf.setClass(REDUCER_INPUT_VALUE_CLASS, inputValueClass,
+        Object.class);
+    reducerConf
+        .setClass(REDUCER_OUTPUT_KEY_CLASS, outputKeyClass, Object.class);
+    reducerConf.setClass(REDUCER_OUTPUT_VALUE_CLASS, outputValueClass,
+        Object.class);
+
+    // serialize the reducer configuration in the chain's configuration.
+    Stringifier<Configuration> stringifier = 
+      new DefaultStringifier<Configuration>(jobConf, Configuration.class);
+    try {
+      jobConf.set(prefix + CHAIN_REDUCER_CONFIG, stringifier
+          .toString(new Configuration(reducerConf)));
+    } catch (IOException ioEx) {
+      throw new RuntimeException(ioEx);
+    }
+  }
+
+  /**
+   * Setup the chain.
+   * 
+   * @param jobConf
+   *          chain job's {@link Configuration}.
+   */
+  @SuppressWarnings("unchecked")
+  void setup(Configuration jobConf) {
+    String prefix = getPrefix(isMap);
+
+    int index = jobConf.getInt(prefix + CHAIN_MAPPER_SIZE, 0);
+    for (int i = 0; i < index; i++) {
+      Class<? extends Mapper> klass = jobConf.getClass(prefix
+          + CHAIN_MAPPER_CLASS + i, null, Mapper.class);
+      Configuration mConf = getChainElementConf(jobConf, prefix
+          + CHAIN_MAPPER_CONFIG + i);
+      confList.add(mConf);
+      Mapper mapper = ReflectionUtils.newInstance(klass, mConf);
+      mappers.add(mapper);
+
+    }
+
+    Class<? extends Reducer> klass = jobConf.getClass(prefix
+        + CHAIN_REDUCER_CLASS, null, Reducer.class);
+    if (klass != null) {
+      rConf = getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG);
+      reducer = ReflectionUtils.newInstance(klass, rConf);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  List<Mapper> getAllMappers() {
+    return mappers;
+  }
+
+  /**
+   * Returns the Reducer instance in the chain.
+   * 
+   * @return the Reducer instance in the chain or NULL if none.
+   */
+  Reducer<?, ?, ?, ?> getReducer() {
+    return reducer;
+  }
+  
+  /**
+   * Creates a ChainBlockingQueue with KeyValuePair as element
+   * 
+   * @return the ChainBlockingQueue
+   */
+  ChainBlockingQueue<KeyValuePair<?, ?>> createBlockingQueue() {
+    return new ChainBlockingQueue<KeyValuePair<?, ?>>();
+  }
+
+  /**
+   * A blocking queue with one element.
+   *   
+   * @param <E>
+   */
+  class ChainBlockingQueue<E> {
+    E element = null;
+    boolean isInterrupted = false;
+    
+    ChainBlockingQueue() {
+      blockingQueues.add(this);
+    }
+
+    synchronized void enqueue(E e) throws InterruptedException {
+      while (element != null) {
+        if (isInterrupted) {
+          throw new InterruptedException();
+        }
+        this.wait();
+      }
+      element = e;
+      this.notify();
+    }
+
+    synchronized E dequeue() throws InterruptedException {
+      while (element == null) {
+        if (isInterrupted) {
+          throw new InterruptedException();
+        }
+        this.wait();
+      }
+      E e = element;
+      element = null;
+      this.notify();
+      return e;
+    }
+
+    synchronized void interrupt() {
+      isInterrupted = true;
+      this.notifyAll();
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java?rev=896750&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java Thu Jan  7 04:06:55 2010
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.chain;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+/**
+ * A simple wrapper class that delegates most of its functionality to the
+ * underlying context, but overrides the methods to do with record readers ,
+ * record writers and configuration.
+ */
+class ChainMapContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT> implements
+    MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+
+  private RecordReader<KEYIN, VALUEIN> reader;
+  private RecordWriter<KEYOUT, VALUEOUT> output;
+  private TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> base;
+  private Configuration conf;
+
+  ChainMapContextImpl(
+      TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> base,
+      RecordReader<KEYIN, VALUEIN> rr, RecordWriter<KEYOUT, VALUEOUT> rw,
+      Configuration conf) {
+    this.reader = rr;
+    this.output = rw;
+    this.base = base;
+    this.conf = conf;
+  }
+
+  @Override
+  public KEYIN getCurrentKey() throws IOException, InterruptedException {
+    return reader.getCurrentKey();
+  }
+
+  @Override
+  public VALUEIN getCurrentValue() throws IOException, InterruptedException {
+    return reader.getCurrentValue();
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    return reader.nextKeyValue();
+  }
+
+  @Override
+  public InputSplit getInputSplit() {
+    if (base instanceof MapContext) {
+      MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mc = 
+        (MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>) base;
+      return mc.getInputSplit();
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public Counter getCounter(Enum<?> counterName) {
+    return base.getCounter(counterName);
+  }
+
+  @Override
+  public Counter getCounter(String groupName, String counterName) {
+    return base.getCounter(groupName, counterName);
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter() {
+    return base.getOutputCommitter();
+  }
+
+  @Override
+  public void write(KEYOUT key, VALUEOUT value) throws IOException,
+      InterruptedException {
+    output.write(key, value);
+  }
+
+  @Override
+  public String getStatus() {
+    return base.getStatus();
+  }
+
+  @Override
+  public TaskAttemptID getTaskAttemptID() {
+    return base.getTaskAttemptID();
+  }
+
+  @Override
+  public void setStatus(String msg) {
+    base.setStatus(msg);
+  }
+
+  @Override
+  public Path[] getArchiveClassPaths() {
+    return base.getArchiveClassPaths();
+  }
+
+  @Override
+  public String[] getArchiveTimestamps() {
+    return base.getArchiveTimestamps();
+  }
+
+  @Override
+  public URI[] getCacheArchives() throws IOException {
+    return base.getCacheArchives();
+  }
+
+  @Override
+  public URI[] getCacheFiles() throws IOException {
+    return base.getCacheFiles();
+  }
+
+  @Override
+  public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass()
+      throws ClassNotFoundException {
+    return base.getCombinerClass();
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  @Override
+  public Path[] getFileClassPaths() {
+    return base.getFileClassPaths();
+  }
+
+  @Override
+  public String[] getFileTimestamps() {
+    return base.getFileTimestamps();
+  }
+
+  @Override
+  public RawComparator<?> getGroupingComparator() {
+    return base.getGroupingComparator();
+  }
+
+  @Override
+  public Class<? extends InputFormat<?, ?>> getInputFormatClass()
+      throws ClassNotFoundException {
+    return base.getInputFormatClass();
+  }
+
+  @Override
+  public String getJar() {
+    return base.getJar();
+  }
+
+  @Override
+  public JobID getJobID() {
+    return base.getJobID();
+  }
+
+  @Override
+  public String getJobName() {
+    return base.getJobName();
+  }
+
+  @Override
+  public boolean getJobSetupCleanupNeeded() {
+    return base.getJobSetupCleanupNeeded();
+  }
+
+  @Override
+  public Path[] getLocalCacheArchives() throws IOException {
+    return base.getLocalCacheArchives();
+  }
+
+  @Override
+  public Path[] getLocalCacheFiles() throws IOException {
+    return base.getLocalCacheArchives();
+  }
+
+  @Override
+  public Class<?> getMapOutputKeyClass() {
+    return base.getMapOutputKeyClass();
+  }
+
+  @Override
+  public Class<?> getMapOutputValueClass() {
+    return base.getMapOutputValueClass();
+  }
+
+  @Override
+  public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass()
+      throws ClassNotFoundException {
+    return base.getMapperClass();
+  }
+
+  @Override
+  public int getMaxMapAttempts() {
+    return base.getMaxMapAttempts();
+  }
+
+  @Override
+  public int getMaxReduceAttempts() {
+    return base.getMaxReduceAttempts();
+  }
+
+  @Override
+  public int getNumReduceTasks() {
+    return base.getNumReduceTasks();
+  }
+
+  @Override
+  public Class<? extends OutputFormat<?, ?>> getOutputFormatClass()
+      throws ClassNotFoundException {
+    return base.getOutputFormatClass();
+  }
+
+  @Override
+  public Class<?> getOutputKeyClass() {
+    return base.getMapOutputKeyClass();
+  }
+
+  @Override
+  public Class<?> getOutputValueClass() {
+    return base.getOutputValueClass();
+  }
+
+  @Override
+  public Class<? extends Partitioner<?, ?>> getPartitionerClass()
+      throws ClassNotFoundException {
+    return base.getPartitionerClass();
+  }
+
+  @Override
+  public boolean getProfileEnabled() {
+    return base.getProfileEnabled();
+  }
+
+  @Override
+  public String getProfileParams() {
+    return base.getProfileParams();
+  }
+
+  @Override
+  public IntegerRanges getProfileTaskRange(boolean isMap) {
+    return base.getProfileTaskRange(isMap);
+  }
+
+  @Override
+  public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass()
+      throws ClassNotFoundException {
+    return base.getReducerClass();
+  }
+
+  @Override
+  public RawComparator<?> getSortComparator() {
+    return base.getSortComparator();
+  }
+
+  @Override
+  public boolean getSymlink() {
+    return base.getSymlink();
+  }
+
+  @Override
+  public String getUser() {
+    return base.getUser();
+  }
+
+  @Override
+  public Path getWorkingDirectory() throws IOException {
+    return base.getWorkingDirectory();
+  }
+
+  @Override
+  public void progress() {
+    base.progress();
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapper.java?rev=896750&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapper.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapper.java Thu Jan  7 04:06:55 2010
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.chain;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.chain.Chain.ChainBlockingQueue;
+
+/**
+ * The ChainMapper class allows to use multiple Mapper classes within a single
+ * Map task.
+ * 
+ * <p>
+ * The Mapper classes are invoked in a chained (or piped) fashion, the output of
+ * the first becomes the input of the second, and so on until the last Mapper,
+ * the output of the last Mapper will be written to the task's output.
+ * </p>
+ * <p>
+ * The key functionality of this feature is that the Mappers in the chain do not
+ * need to be aware that they are executed in a chain. This enables having
+ * reusable specialized Mappers that can be combined to perform composite
+ * operations within a single task.
+ * </p>
+ * <p>
+ * Special care has to be taken when creating chains that the key/values output
+ * by a Mapper are valid for the following Mapper in the chain. It is assumed
+ * all Mappers and the Reduce in the chain use matching output and input key and
+ * value classes as no conversion is done by the chaining code.
+ * </p>
+ * <p>
+ * Using the ChainMapper and the ChainReducer classes is possible to compose
+ * Map/Reduce jobs that look like <code>[MAP+ / REDUCE MAP*]</code>. And
+ * immediate benefit of this pattern is a dramatic reduction in disk IO.
+ * </p>
+ * <p>
+ * IMPORTANT: There is no need to specify the output key/value classes for the
+ * ChainMapper, this is done by the addMapper for the last mapper in the chain.
+ * </p>
+ * ChainMapper usage pattern:
+ * <p/>
+ * 
+ * <pre>
+ * ...
+ * Job = new Job(conf);
+ * <p/>
+ * Configuration mapAConf = new Configuration(false);
+ * ...
+ * ChainMapper.addMapper(job, AMap.class, LongWritable.class, Text.class,
+ *   Text.class, Text.class, true, mapAConf);
+ * <p/>
+ * Configuration mapBConf = new Configuration(false);
+ * ...
+ * ChainMapper.addMapper(job, BMap.class, Text.class, Text.class,
+ *   LongWritable.class, Text.class, false, mapBConf);
+ * <p/>
+ * ...
+ * <p/>
+ * job.waitForComplettion(true);
+ * ...
+ * </pre>
+ */
+public class ChainMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends
+    Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+
+  /**
+   * Adds a {@link Mapper} class to the chain mapper.
+   * 
+   * <p>
+   * The key and values are passed from one element of the chain to the next, by
+   * value. For the added Mapper the configuration given for it,
+   * <code>mapperConf</code>, have precedence over the job's Configuration. This
+   * precedence is in effect when the task is running.
+   * </p>
+   * <p>
+   * IMPORTANT: There is no need to specify the output key/value classes for the
+   * ChainMapper, this is done by the addMapper for the last mapper in the chain
+   * </p>
+   * 
+   * @param job
+   *          The job.
+   * @param klass
+   *          the Mapper class to add.
+   * @param inputKeyClass
+   *          mapper input key class.
+   * @param inputValueClass
+   *          mapper input value class.
+   * @param outputKeyClass
+   *          mapper output key class.
+   * @param outputValueClass
+   *          mapper output value class.
+   * @param mapperConf
+   *          a configuration for the Mapper class. It is recommended to use a
+   *          Configuration without default values using the
+   *          <code>Configuration(boolean loadDefaults)</code> constructor with
+   *          FALSE.
+   */
+  public static void addMapper(Job job, Class<? extends Mapper> klass,
+      Class<?> inputKeyClass, Class<?> inputValueClass,
+      Class<?> outputKeyClass, Class<?> outputValueClass,
+      Configuration mapperConf) throws IOException {
+    job.setMapperClass(ChainMapper.class);
+    job.setMapOutputKeyClass(outputKeyClass);
+    job.setMapOutputValueClass(outputValueClass);
+    Chain.addMapper(true, job, klass, inputKeyClass, inputValueClass,
+        outputKeyClass, outputValueClass, mapperConf);
+  }
+
+  private Chain chain;
+
+  protected void setup(Context context) {
+    chain = new Chain(true);
+    chain.setup(context.getConfiguration());
+  }
+
+  public void run(Context context) throws IOException, InterruptedException {
+
+    setup(context);
+
+    int numMappers = chain.getAllMappers().size();
+    if (numMappers == 0) {
+      return;
+    }
+
+    ChainBlockingQueue<Chain.KeyValuePair<?, ?>> inputqueue;
+    ChainBlockingQueue<Chain.KeyValuePair<?, ?>> outputqueue;
+    if (numMappers == 1) {
+      chain.runMapper(context, 0);
+    } else {
+      // add all the mappers with proper context
+      // add first mapper
+      outputqueue = chain.createBlockingQueue();
+      chain.addMapper(context, outputqueue, 0);
+      // add other mappers
+      for (int i = 1; i < numMappers - 1; i++) {
+        inputqueue = outputqueue;
+        outputqueue = chain.createBlockingQueue();
+        chain.addMapper(inputqueue, outputqueue, context, i);
+      }
+      // add last mapper
+      chain.addMapper(outputqueue, context, numMappers - 1);
+    }
+    
+    // start all threads
+    chain.startAllThreads();
+    
+    // wait for all threads
+    chain.joinAllThreads();
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java?rev=896750&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java Thu Jan  7 04:06:55 2010
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.chain;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.ReduceContext;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+/**
+ * A simple wrapper class that delegates most of its functionality to the
+ * underlying context, but overrides the methods to do with record writer and
+ * configuration
+ */
+class ChainReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT> implements
+    ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+
+  private final ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> base;
+  private final RecordWriter<KEYOUT, VALUEOUT> rw;
+  private final Configuration conf;
+
+  public ChainReduceContextImpl(
+      ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> base,
+      RecordWriter<KEYOUT, VALUEOUT> output, Configuration conf) {
+    this.base = base;
+    this.rw = output;
+    this.conf = conf;
+  }
+
+  @Override
+  public Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
+    return base.getValues();
+  }
+
+  @Override
+  public boolean nextKey() throws IOException, InterruptedException {
+    return base.nextKey();
+  }
+
+  @Override
+  public Counter getCounter(Enum<?> counterName) {
+    return base.getCounter(counterName);
+  }
+
+  @Override
+  public Counter getCounter(String groupName, String counterName) {
+    return base.getCounter(groupName, counterName);
+  }
+
+  @Override
+  public KEYIN getCurrentKey() throws IOException, InterruptedException {
+    return base.getCurrentKey();
+  }
+
+  @Override
+  public VALUEIN getCurrentValue() throws IOException, InterruptedException {
+    return base.getCurrentValue();
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter() {
+    return base.getOutputCommitter();
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    return base.nextKeyValue();
+  }
+
+  @Override
+  public void write(KEYOUT key, VALUEOUT value) throws IOException,
+      InterruptedException {
+    rw.write(key, value);
+  }
+
+  @Override
+  public String getStatus() {
+    return base.getStatus();
+  }
+
+  @Override
+  public TaskAttemptID getTaskAttemptID() {
+    return base.getTaskAttemptID();
+  }
+
+  @Override
+  public void setStatus(String msg) {
+    base.setStatus(msg);
+  }
+
+  @Override
+  public Path[] getArchiveClassPaths() {
+    return base.getArchiveClassPaths();
+  }
+
+  @Override
+  public String[] getArchiveTimestamps() {
+    return base.getArchiveTimestamps();
+  }
+
+  @Override
+  public URI[] getCacheArchives() throws IOException {
+    return base.getCacheArchives();
+  }
+
+  @Override
+  public URI[] getCacheFiles() throws IOException {
+    return base.getCacheFiles();
+  }
+
+  @Override
+  public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass()
+      throws ClassNotFoundException {
+    return base.getCombinerClass();
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  @Override
+  public Path[] getFileClassPaths() {
+    return base.getFileClassPaths();
+  }
+
+  @Override
+  public String[] getFileTimestamps() {
+    return base.getFileTimestamps();
+  }
+
+  @Override
+  public RawComparator<?> getGroupingComparator() {
+    return base.getGroupingComparator();
+  }
+
+  @Override
+  public Class<? extends InputFormat<?, ?>> getInputFormatClass()
+      throws ClassNotFoundException {
+    return base.getInputFormatClass();
+  }
+
+  @Override
+  public String getJar() {
+    return base.getJar();
+  }
+
+  @Override
+  public JobID getJobID() {
+    return base.getJobID();
+  }
+
+  @Override
+  public String getJobName() {
+    return base.getJobName();
+  }
+
+  @Override
+  public boolean getJobSetupCleanupNeeded() {
+    return base.getJobSetupCleanupNeeded();
+  }
+
+  @Override
+  public Path[] getLocalCacheArchives() throws IOException {
+    return base.getLocalCacheArchives();
+  }
+
+  @Override
+  public Path[] getLocalCacheFiles() throws IOException {
+    return base.getLocalCacheFiles();
+  }
+
+  @Override
+  public Class<?> getMapOutputKeyClass() {
+    return base.getMapOutputKeyClass();
+  }
+
+  @Override
+  public Class<?> getMapOutputValueClass() {
+    return base.getMapOutputValueClass();
+  }
+
+  @Override
+  public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass()
+      throws ClassNotFoundException {
+    return base.getMapperClass();
+  }
+
+  @Override
+  public int getMaxMapAttempts() {
+    return base.getMaxMapAttempts();
+  }
+
+  @Override
+  public int getMaxReduceAttempts() {
+    return base.getMaxMapAttempts();
+  }
+
+  @Override
+  public int getNumReduceTasks() {
+    return base.getNumReduceTasks();
+  }
+
+  @Override
+  public Class<? extends OutputFormat<?, ?>> getOutputFormatClass()
+      throws ClassNotFoundException {
+    return base.getOutputFormatClass();
+  }
+
+  @Override
+  public Class<?> getOutputKeyClass() {
+    return base.getOutputKeyClass();
+  }
+
+  @Override
+  public Class<?> getOutputValueClass() {
+    return base.getOutputValueClass();
+  }
+
+  @Override
+  public Class<? extends Partitioner<?, ?>> getPartitionerClass()
+      throws ClassNotFoundException {
+    return base.getPartitionerClass();
+  }
+
+  @Override
+  public boolean getProfileEnabled() {
+    return base.getProfileEnabled();
+  }
+
+  @Override
+  public String getProfileParams() {
+    return base.getProfileParams();
+  }
+
+  @Override
+  public IntegerRanges getProfileTaskRange(boolean isMap) {
+    return base.getProfileTaskRange(isMap);
+  }
+
+  @Override
+  public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass()
+      throws ClassNotFoundException {
+    return base.getReducerClass();
+  }
+
+  @Override
+  public RawComparator<?> getSortComparator() {
+    return base.getSortComparator();
+  }
+
+  @Override
+  public boolean getSymlink() {
+    return base.getSymlink();
+  }
+
+  @Override
+  public String getUser() {
+    return base.getUser();
+  }
+
+  @Override
+  public Path getWorkingDirectory() throws IOException {
+    return base.getWorkingDirectory();
+  }
+
+  @Override
+  public void progress() {
+    base.progress();
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReducer.java?rev=896750&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReducer.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReducer.java Thu Jan  7 04:06:55 2010
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.chain;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.chain.Chain.ChainBlockingQueue;
+
+import java.io.IOException;
+
+/**
+ * The ChainReducer class allows to chain multiple Mapper classes after a
+ * Reducer within the Reducer task.
+ * 
+ * <p>
+ * For each record output by the Reducer, the Mapper classes are invoked in a
+ * chained (or piped) fashion. The output of the reducer becomes the input of
+ * the first mapper and output of first becomes the input of the second, and so
+ * on until the last Mapper, the output of the last Mapper will be written to
+ * the task's output.
+ * </p>
+ * <p>
+ * The key functionality of this feature is that the Mappers in the chain do not
+ * need to be aware that they are executed after the Reducer or in a chain. This
+ * enables having reusable specialized Mappers that can be combined to perform
+ * composite operations within a single task.
+ * </p>
+ * <p>
+ * Special care has to be taken when creating chains that the key/values output
+ * by a Mapper are valid for the following Mapper in the chain. It is assumed
+ * all Mappers and the Reduce in the chain use matching output and input key and
+ * value classes as no conversion is done by the chaining code.
+ * </p>
+ * </p> Using the ChainMapper and the ChainReducer classes is possible to
+ * compose Map/Reduce jobs that look like <code>[MAP+ / REDUCE MAP*]</code>. And
+ * immediate benefit of this pattern is a dramatic reduction in disk IO. </p>
+ * <p>
+ * IMPORTANT: There is no need to specify the output key/value classes for the
+ * ChainReducer, this is done by the setReducer or the addMapper for the last
+ * element in the chain.
+ * </p>
+ * ChainReducer usage pattern:
+ * <p/>
+ * 
+ * <pre>
+ * ...
+ * Job = new Job(conf);
+ * ....
+ * <p/>
+ * Configuration reduceConf = new Configuration(false);
+ * ...
+ * ChainReducer.setReducer(job, XReduce.class, LongWritable.class, Text.class,
+ *   Text.class, Text.class, true, reduceConf);
+ * <p/>
+ * ChainReducer.addMapper(job, CMap.class, Text.class, Text.class,
+ *   LongWritable.class, Text.class, false, null);
+ * <p/>
+ * ChainReducer.addMapper(job, DMap.class, LongWritable.class, Text.class,
+ *   LongWritable.class, LongWritable.class, true, null);
+ * <p/>
+ * ...
+ * <p/>
+ * job.waitForCompletion(true);
+ * ...
+ * </pre>
+ */
+public class ChainReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends
+    Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+
+  /**
+   * Sets the {@link Reducer} class to the chain job.
+   * 
+   * <p>
+   * The key and values are passed from one element of the chain to the next, by
+   * value. For the added Reducer the configuration given for it,
+   * <code>reducerConf</code>, have precedence over the job's Configuration.
+   * This precedence is in effect when the task is running.
+   * </p>
+   * <p>
+   * IMPORTANT: There is no need to specify the output key/value classes for the
+   * ChainReducer, this is done by the setReducer or the addMapper for the last
+   * element in the chain.
+   * </p>
+   * 
+   * @param job
+   *          the job
+   * @param klass
+   *          the Reducer class to add.
+   * @param inputKeyClass
+   *          reducer input key class.
+   * @param inputValueClass
+   *          reducer input value class.
+   * @param outputKeyClass
+   *          reducer output key class.
+   * @param outputValueClass
+   *          reducer output value class.
+   * @param reducerConf
+   *          a configuration for the Reducer class. It is recommended to use a
+   *          Configuration without default values using the
+   *          <code>Configuration(boolean loadDefaults)</code> constructor with
+   *          FALSE.
+   */
+  public static void setReducer(Job job, Class<? extends Reducer> klass,
+      Class<?> inputKeyClass, Class<?> inputValueClass,
+      Class<?> outputKeyClass, Class<?> outputValueClass,
+      Configuration reducerConf) {
+    job.setReducerClass(ChainReducer.class);
+    job.setOutputKeyClass(outputKeyClass);
+    job.setOutputValueClass(outputValueClass);
+    Chain.setReducer(job, klass, inputKeyClass, inputValueClass,
+        outputKeyClass, outputValueClass, reducerConf);
+  }
+
+  /**
+   * Adds a {@link Mapper} class to the chain reducer.
+   * 
+   * <p>
+   * The key and values are passed from one element of the chain to the next, by
+   * value For the added Mapper the configuration given for it,
+   * <code>mapperConf</code>, have precedence over the job's Configuration. This
+   * precedence is in effect when the task is running.
+   * </p>
+   * <p>
+   * IMPORTANT: There is no need to specify the output key/value classes for the
+   * ChainMapper, this is done by the addMapper for the last mapper in the
+   * chain.
+   * </p>
+   * 
+   * @param job
+   *          The job.
+   * @param klass
+   *          the Mapper class to add.
+   * @param inputKeyClass
+   *          mapper input key class.
+   * @param inputValueClass
+   *          mapper input value class.
+   * @param outputKeyClass
+   *          mapper output key class.
+   * @param outputValueClass
+   *          mapper output value class.
+   * @param mapperConf
+   *          a configuration for the Mapper class. It is recommended to use a
+   *          Configuration without default values using the
+   *          <code>Configuration(boolean loadDefaults)</code> constructor with
+   *          FALSE.
+   */
+  public static void addMapper(Job job, Class<? extends Mapper> klass,
+      Class<?> inputKeyClass, Class<?> inputValueClass,
+      Class<?> outputKeyClass, Class<?> outputValueClass,
+      Configuration mapperConf) throws IOException {
+    job.setOutputKeyClass(outputKeyClass);
+    job.setOutputValueClass(outputValueClass);
+    Chain.addMapper(false, job, klass, inputKeyClass, inputValueClass,
+        outputKeyClass, outputValueClass, mapperConf);
+  }
+
+  private Chain chain;
+
+  protected void setup(Context context) {
+    chain = new Chain(false);
+    chain.setup(context.getConfiguration());
+  }
+
+  public void run(Context context) throws IOException, InterruptedException {
+    setup(context);
+
+    // if no reducer is set, just do nothing
+    if (chain.getReducer() == null) {
+      return;
+    }
+    int numMappers = chain.getAllMappers().size();
+    // if there are no mappers in chain, run the reducer
+    if (numMappers == 0) {
+      chain.runReducer(context);
+      return;
+    }
+
+    // add reducer and all mappers with proper context
+    ChainBlockingQueue<Chain.KeyValuePair<?, ?>> inputqueue;
+    ChainBlockingQueue<Chain.KeyValuePair<?, ?>> outputqueue;
+    // add reducer
+    outputqueue = chain.createBlockingQueue();
+    chain.addReducer(context, outputqueue);
+    // add all mappers except last one
+    for (int i = 0; i < numMappers - 1; i++) {
+      inputqueue = outputqueue;
+      outputqueue = chain.createBlockingQueue();
+      chain.addMapper(inputqueue, outputqueue, context, i);
+    }
+    // add last mapper
+    chain.addMapper(outputqueue, context, numMappers - 1);
+
+    // start all threads
+    chain.startAllThreads();
+    
+    // wait for all threads
+    chain.joinAllThreads();
+  }
+}

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestChainErrors.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestChainErrors.java?rev=896750&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestChainErrors.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/chain/TestChainErrors.java Thu Jan  7 04:06:55 2010
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.chain;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.HadoopTestCase;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * Tests error conditions in ChainMapper/ChainReducer.
+ */
+public class TestChainErrors extends HadoopTestCase {
+
+  private static String localPathRoot = System.getProperty("test.build.data",
+      "/tmp");
+
+  public TestChainErrors() throws IOException {
+    super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1);
+  }
+
+  private Path inDir = new Path(localPathRoot, "testing/chain/input");
+  private Path outDir = new Path(localPathRoot, "testing/chain/output");
+  private String input = "a\nb\nc\nd\n";
+
+  /**
+   * Tests errors during submission.
+   * 
+   * @throws Exception
+   */
+  public void testChainSubmission() throws Exception {
+
+    Configuration conf = createJobConf();
+
+    Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 0, 0, input);
+    job.setJobName("chain");
+
+    Throwable th = null;
+    // output key,value classes of first map are not same as that of second map
+    try {
+      ChainMapper.addMapper(job, Mapper.class, LongWritable.class, Text.class,
+          IntWritable.class, Text.class, null);
+      ChainMapper.addMapper(job, Mapper.class, LongWritable.class, Text.class,
+          LongWritable.class, Text.class, null);
+    } catch (IllegalArgumentException iae) {
+      th = iae;
+    }
+    assertTrue(th != null);
+
+    th = null;
+    // output key,value classes of reducer are not
+    // same as that of mapper in the chain
+    try {
+      ChainReducer.setReducer(job, Reducer.class, LongWritable.class,
+          Text.class, IntWritable.class, Text.class, null);
+      ChainMapper.addMapper(job, Mapper.class, LongWritable.class, Text.class,
+          LongWritable.class, Text.class, null);
+    } catch (IllegalArgumentException iae) {
+      th = iae;
+    }
+    assertTrue(th != null);
+  }
+
+  /**
+   * Tests one of the mappers throwing exception.
+   * 
+   * @throws Exception
+   */
+  public void testChainFail() throws Exception {
+
+    Configuration conf = createJobConf();
+
+    Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 0, input);
+    job.setJobName("chain");
+
+    ChainMapper.addMapper(job, Mapper.class, LongWritable.class, Text.class,
+        LongWritable.class, Text.class, null);
+
+    ChainMapper.addMapper(job, FailMap.class, LongWritable.class, Text.class,
+        IntWritable.class, Text.class, null);
+
+    ChainMapper.addMapper(job, Mapper.class, IntWritable.class, Text.class,
+        LongWritable.class, Text.class, null);
+
+    job.waitForCompletion(true);
+    assertTrue("Job Not failed", !job.isSuccessful());
+  }
+
+  /**
+   * Tests Reducer throwing exception.
+   * 
+   * @throws Exception
+   */
+  public void testReducerFail() throws Exception {
+
+    Configuration conf = createJobConf();
+
+    Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 1, input);
+    job.setJobName("chain");
+
+    ChainMapper.addMapper(job, Mapper.class, LongWritable.class, Text.class,
+        LongWritable.class, Text.class, null);
+
+    ChainReducer.setReducer(job, FailReduce.class, LongWritable.class,
+        Text.class, LongWritable.class, Text.class, null);
+
+    ChainReducer.addMapper(job, Mapper.class, LongWritable.class, Text.class,
+        LongWritable.class, Text.class, null);
+
+    job.waitForCompletion(true);
+    assertTrue("Job Not failed", !job.isSuccessful());
+  }
+
+  /**
+   * Tests one of the maps consuming output.
+   * 
+   * @throws Exception
+   */
+  public void testChainMapNoOuptut() throws Exception {
+    Configuration conf = createJobConf();
+    String expectedOutput = "";
+
+    Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 0, input);
+    job.setJobName("chain");
+
+    ChainMapper.addMapper(job, ConsumeMap.class, IntWritable.class, Text.class,
+        LongWritable.class, Text.class, null);
+
+    ChainMapper.addMapper(job, Mapper.class, LongWritable.class, Text.class,
+        LongWritable.class, Text.class, null);
+
+    job.waitForCompletion(true);
+    assertTrue("Job failed", job.isSuccessful());
+    assertEquals("Outputs doesn't match", expectedOutput, MapReduceTestUtil
+        .readOutput(outDir, conf));
+  }
+
+  /**
+   * Tests reducer consuming output.
+   * 
+   * @throws Exception
+   */
+  public void testChainReduceNoOuptut() throws Exception {
+    Configuration conf = createJobConf();
+    String expectedOutput = "";
+
+    Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 1, input);
+    job.setJobName("chain");
+
+    ChainMapper.addMapper(job, Mapper.class, IntWritable.class, Text.class,
+        LongWritable.class, Text.class, null);
+
+    ChainReducer.setReducer(job, ConsumeReduce.class, LongWritable.class,
+        Text.class, LongWritable.class, Text.class, null);
+
+    ChainReducer.addMapper(job, Mapper.class, LongWritable.class, Text.class,
+        LongWritable.class, Text.class, null);
+
+    job.waitForCompletion(true);
+    assertTrue("Job failed", job.isSuccessful());
+    assertEquals("Outputs doesn't match", expectedOutput, MapReduceTestUtil
+        .readOutput(outDir, conf));
+  }
+
+  // this map consumes all the input and output nothing
+  public static class ConsumeMap extends
+      Mapper<LongWritable, Text, LongWritable, Text> {
+    public void map(LongWritable key, Text value, Context context)
+        throws IOException, InterruptedException {
+    }
+  }
+
+  // this reduce consumes all the input and output nothing
+  public static class ConsumeReduce extends
+      Reducer<LongWritable, Text, LongWritable, Text> {
+    public void reduce(LongWritable key, Iterable<Text> values, Context context)
+        throws IOException, InterruptedException {
+    }
+  }
+
+  // this map throws IOException for input value "b"
+  public static class FailMap extends
+      Mapper<LongWritable, Text, IntWritable, Text> {
+    protected void map(LongWritable key, Text value, Context context)
+        throws IOException, InterruptedException {
+      if (value.toString().equals("b")) {
+        throw new IOException();
+      }
+    }
+  }
+
+  // this reduce throws IOEexception for any input
+  public static class FailReduce extends
+      Reducer<LongWritable, Text, LongWritable, Text> {
+    public void reduce(LongWritable key, Iterable<Text> values, Context context)
+        throws IOException, InterruptedException {
+      throw new IOException();
+    }
+  }
+}



Mime
View raw message