hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r816816 [1/2] - in /hadoop/mapreduce/trunk: ./ src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/ src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/ src/contrib/vertica/src/test/org/apache/hadoop/vertica/ src/...
Date Fri, 18 Sep 2009 23:52:58 GMT
Author: acmurthy
Date: Fri Sep 18 23:52:56 2009
New Revision: 816816

URL: http://svn.apache.org/viewvc?rev=816816&view=rev
Log:
MAPREDUCE-954. Change Map-Reduce context objects to be interfaces. (acmurthy)

Added:
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockMapContext.java
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContext.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobContextImpl.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskAttemptContextImpl.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/MapContextImpl.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/ReduceContextImpl.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/TaskInputOutputContextImpl.java
Removed:
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockMapContextWrapper.java
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContextWrapper.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
    hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/TestVertica.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobContext.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskAttemptContext.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MapContext.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Mapper.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ReduceContext.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Reducer.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskInputOutputContext.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/Parser.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java
    hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRKeyValueTextInputFormat.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsBinaryInputFormat.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsTextInputFormat.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileInputFilter.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestJoinProperties.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestWrappedRRClassloader.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Sep 18 23:52:56 2009
@@ -405,6 +405,9 @@
 
     MAPREDUCE-1011. Add build.properties to svn and git ignore. (omalley)
 
+    MAPREDUCE-954. Change Map-Reduce context objects to be interfaces.
+    (acmurthy) 
+
   BUG FIXES
 
     MAPREDUCE-878. Rename fair scheduler design doc to 

Modified: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/MapDriver.java Fri Sep 18 23:52:56 2009
@@ -25,11 +25,11 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
 import org.apache.hadoop.mrunit.MapDriverBase;
-import org.apache.hadoop.mrunit.mapreduce.mock.MockMapContextWrapper;
+import org.apache.hadoop.mrunit.mapreduce.mock.MockMapContext;
 import org.apache.hadoop.mrunit.types.Pair;
 
 /**
@@ -192,11 +192,10 @@
     inputs.add(new Pair<K1, V1>(inputKey, inputVal));
 
     try {
-      MockMapContextWrapper<K1, V1, K2, V2> wrapper = new MockMapContextWrapper();
-      MockMapContextWrapper<K1, V1, K2, V2>.MockMapContext context =
-          wrapper.getMockContext(inputs, getCounters());
+      MockMapContext<K1, V1, K2, V2> context =
+        new MockMapContext<K1, V1, K2, V2>(inputs, getCounters());
 
-      myMapper.run(context);
+      myMapper.run(new WrappedMapper<K1, V1, K2, V2>().getMapContext(context));
       return context.getOutputs();
     } catch (InterruptedException ie) {
       throw new IOException(ie);

Modified: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/ReduceDriver.java Fri Sep 18 23:52:56 2009
@@ -25,11 +25,11 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
 import org.apache.hadoop.mrunit.ReduceDriverBase;
-import org.apache.hadoop.mrunit.mapreduce.mock.MockReduceContextWrapper;
+import org.apache.hadoop.mrunit.mapreduce.mock.MockReduceContext;
 import org.apache.hadoop.mrunit.types.Pair;
 
 /**
@@ -197,11 +197,9 @@
     inputs.add(new Pair<K1, List<V1>>(inputKey, inputValues));
 
     try {
-      MockReduceContextWrapper<K1, V1, K2, V2> wrapper = new MockReduceContextWrapper();
-      MockReduceContextWrapper<K1, V1, K2, V2>.MockReduceContext context =
-          wrapper.getMockContext(inputs, getCounters());
-
-      myReducer.run(context);
+      MockReduceContext<K1, V1, K2, V2> context = 
+        new MockReduceContext<K1, V1, K2, V2>(inputs, getCounters());
+      myReducer.run(new WrappedReducer<K1, V1, K2, V2>().getReducerContext(context));
       return context.getOutputs();
     } catch (InterruptedException ie) {
       throw new IOException(ie);

Added: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockMapContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockMapContext.java?rev=816816&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockMapContext.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockMapContext.java Fri Sep 18 23:52:56 2009
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mrunit.mapreduce.mock;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
+import org.apache.hadoop.mrunit.mock.MockOutputCollector;
+import org.apache.hadoop.mrunit.types.Pair;
+
+public class MockMapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
+    extends MapContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+
+  private Iterator<Pair<KEYIN, VALUEIN>> inputIter;
+  private Pair<KEYIN, VALUEIN> curInput;
+  private MockOutputCollector<KEYOUT, VALUEOUT> output;
+  
+  public MockMapContext(final List<Pair<KEYIN, VALUEIN>> in, 
+                        final Counters counters) {
+    super(new Configuration(),
+        new TaskAttemptID("mrunit-jt", 0, TaskType.MAP, 0, 0),
+        null, null, new MockOutputCommitter(), new MockReporter(counters), null);
+    this.inputIter = in.iterator();
+    this.output = new MockOutputCollector<KEYOUT, VALUEOUT>();
+  }
+
+  @Override
+  public InputSplit getInputSplit() {
+    return new MockInputSplit();
+  }
+
+  @Override
+  public KEYIN getCurrentKey() {
+    return curInput.getFirst();
+  }
+
+  @Override
+  public VALUEIN getCurrentValue() {
+    return curInput.getSecond();
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException {
+    if (this.inputIter.hasNext()) {
+      this.curInput = this.inputIter.next();
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  public void write(KEYOUT key, VALUEOUT value) throws IOException {
+    output.collect(key, value);
+  }
+
+  @Override
+  /** This method does nothing in the mock version. */
+  public void progress() {
+  }
+
+  @Override
+  /** This method does nothing in the mock version. */
+  public void setStatus(String status) {
+  }
+
+  /**
+   * @return the outputs from the MockOutputCollector back to
+   * the test harness.
+   */
+  public List<Pair<KEYOUT, VALUEOUT>> getOutputs() {
+    return output.getOutputs();
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContext.java?rev=816816&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContext.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReduceContext.java Fri Sep 18 23:52:56 2009
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mrunit.mapreduce.mock;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
+import org.apache.hadoop.mrunit.mock.MockOutputCollector;
+import org.apache.hadoop.mrunit.types.Pair;
+
+public class MockReduceContext <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
+    extends ReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+
+  // The iterator over the input key, list(val).
+  private Iterator<Pair<KEYIN, List<VALUEIN>>> inputIter;
+
+  // The current key and list of values.
+  private KEYIN curKey;
+  private InspectableIterable curValueIterable;
+
+  private MockOutputCollector<KEYOUT, VALUEOUT> output;
+
+  public MockReduceContext(final List<Pair<KEYIN, List<VALUEIN>>> in, 
+                           final Counters counters) 
+  throws IOException, InterruptedException {
+    super(new Configuration(),
+          new TaskAttemptID("mrunit-jt", 0, TaskType.REDUCE, 0, 0),
+          new MockRawKeyValueIterator(), null, null, null,
+          new MockOutputCommitter(), new MockReporter(counters), null,
+          (Class) Text.class, (Class) Text.class);
+    this.inputIter = in.iterator();
+    this.output = new MockOutputCollector<KEYOUT, VALUEOUT>();
+  }
+
+
+  /**
+   * A private iterable/iterator implementation that wraps around the 
+   * underlying iterable/iterator used by the input value list. This
+   * memorizes the last value we saw so that we can return it in getCurrentValue().
+   */
+  private class InspectableIterable implements Iterable<VALUEIN> {
+    private Iterable<VALUEIN> base;
+    private VALUEIN lastVal;
+
+    public InspectableIterable(final Iterable<VALUEIN> baseCollection) {
+      this.base = baseCollection;
+    }
+
+    public Iterator<VALUEIN> iterator() {
+      return new InspectableIterator(this.base.iterator());
+    }
+
+    public VALUEIN getLastVal() {
+      return lastVal;
+    }
+
+    private class InspectableIterator 
+        extends ReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.ValueIterator
+        implements Iterator<VALUEIN> {
+      private Iterator<VALUEIN> iter;
+      public InspectableIterator(final Iterator<VALUEIN> baseIter) {
+        iter = baseIter;
+      }
+
+      public VALUEIN next() {
+        InspectableIterable.this.lastVal = iter.next();
+        return InspectableIterable.this.lastVal;
+      }
+
+      public boolean hasNext() {
+        return iter.hasNext();
+      }
+
+      public void remove() {
+        iter.remove();
+      }
+    }
+  }
+
+  @Override
+  public boolean nextKey() {
+    if (inputIter.hasNext()) {
+      // Advance to the next key and list of values
+      Pair<KEYIN, List<VALUEIN>> p = inputIter.next();
+      curKey = p.getFirst();
+
+      // Reset the value iterator
+      curValueIterable = new InspectableIterable(p.getSecond());
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean nextKeyValue() {
+    return nextKey();
+  }
+
+  @Override
+  public KEYIN getCurrentKey() {
+    return curKey;
+  }
+
+  @Override
+  public VALUEIN getCurrentValue() {
+    return curValueIterable.getLastVal();
+  }
+
+  @Override
+  public Iterable<VALUEIN> getValues() {
+    return curValueIterable;
+  }
+
+  public void write(KEYOUT key, VALUEOUT value) throws IOException {
+    output.collect(key, value);
+  }
+
+  @Override
+  /** This method does nothing in the mock version. */
+  public void progress() {
+  }
+
+  @Override
+  /** This method does nothing in the mock version. */
+  public void setStatus(String status) {
+  }
+
+  /**
+   * @return the outputs from the MockOutputCollector back to
+   * the test harness.
+   */
+  public List<Pair<KEYOUT, VALUEOUT>> getOutputs() {
+    return output.getOutputs();
+  }
+}

Modified: hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/TestVertica.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/TestVertica.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/TestVertica.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/TestVertica.java Fri Sep 18 23:52:56 2009
@@ -40,8 +40,10 @@
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobContextImpl;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.vertica.VerticaConfiguration;
 import org.apache.hadoop.vertica.VerticaInputFormat;
@@ -232,7 +234,7 @@
     VerticaInputSplit input = getVerticaSplit(false);
     VerticaRecordReader reader = new VerticaRecordReader(input, input
         .getConfiguration());
-    TaskAttemptContext context = new TaskAttemptContext(input
+    TaskAttemptContext context = new TaskAttemptContextImpl(input
         .getConfiguration(), new TaskAttemptID());
     reader.initialize(input, context);
 
@@ -254,7 +256,7 @@
 
     Configuration conf = job.getConfiguration();
     conf.setInt("mapred.map.tasks", 1);
-    JobContext context = new JobContext(conf, new JobID());
+    JobContext context = new JobContextImpl(conf, new JobID());
 
     splits = input.getSplits(context);
     assert splits.size() == 1;
@@ -308,7 +310,7 @@
         "c char(1)", "d date", "f float", "t timestamp", "v varchar",
         "z varbinary");
     output.checkOutputSpecs(job, true);
-    TaskAttemptContext context = new TaskAttemptContext(job.getConfiguration(),
+    TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(),
         new TaskAttemptID());
     VerticaRecordWriter writer = (VerticaRecordWriter) output
         .getRecordWriter(context);

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputFormat.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileOutputFormat.java Fri Sep 18 23:52:56 2009
@@ -235,8 +235,10 @@
 
     OutputCommitter committer = conf.getOutputCommitter();
     Path workPath = outputPath;
-    TaskAttemptContext context = new TaskAttemptContext(conf,
-                TaskAttemptID.forName(conf.get(JobContext.TASK_ATTEMPT_ID)));
+    TaskAttemptContext context = 
+      new TaskAttemptContextImpl(conf,
+                                 TaskAttemptID.forName(conf.get(
+                                     JobContext.TASK_ATTEMPT_ID)));
     if (committer instanceof FileOutputCommitter) {
       workPath = ((FileOutputCommitter)committer).getWorkPath(context,
                                                               outputPath);

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobContext.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobContext.java Fri Sep 18 23:52:56 2009
@@ -23,36 +23,18 @@
  * @deprecated Use {@link org.apache.hadoop.mapreduce.JobContext} instead.
  */
 @Deprecated
-public class JobContext extends org.apache.hadoop.mapreduce.JobContext {
-  private JobConf job;
-  private Progressable progress;
-
-  JobContext(JobConf conf, org.apache.hadoop.mapreduce.JobID jobId, 
-             Progressable progress) {
-    super(conf, jobId);
-    this.job = conf;
-    this.progress = progress;
-  }
-
-  JobContext(JobConf conf, org.apache.hadoop.mapreduce.JobID jobId) {
-    this(conf, jobId, Reporter.NULL);
-  }
-  
+public interface JobContext extends org.apache.hadoop.mapreduce.JobContext {
   /**
    * Get the job Configuration
    * 
    * @return JobConf
    */
-  public JobConf getJobConf() {
-    return job;
-  }
+  public JobConf getJobConf();
   
   /**
    * Get the progress mechanism for reporting progress.
    * 
    * @return progress mechanism 
    */
-  public Progressable getProgressible() {
-    return progress;
-  }
+  public Progressable getProgressible();
 }

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobContextImpl.java?rev=816816&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobContextImpl.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobContextImpl.java Fri Sep 18 23:52:56 2009
@@ -0,0 +1,60 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.JobContext} instead.
+ */
+@Deprecated
+public class JobContextImpl 
+    extends org.apache.hadoop.mapreduce.task.JobContextImpl 
+    implements JobContext {
+  private JobConf job;
+  private Progressable progress;
+
+  JobContextImpl(JobConf conf, org.apache.hadoop.mapreduce.JobID jobId, 
+                 Progressable progress) {
+    super(conf, jobId);
+    this.job = conf;
+    this.progress = progress;
+  }
+
+  JobContextImpl(JobConf conf, org.apache.hadoop.mapreduce.JobID jobId) {
+    this(conf, jobId, Reporter.NULL);
+  }
+  
+  /**
+   * Get the job Configuration
+   * 
+   * @return JobConf
+   */
+  public JobConf getJobConf() {
+    return job;
+  }
+  
+  /**
+   * Get the progress mechanism for reporting progress.
+   * 
+   * @return progress mechanism 
+   */
+  public Progressable getProgressible() {
+    return progress;
+  }
+}

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri Sep 18 23:52:56 2009
@@ -384,7 +384,7 @@
     this.numReduceTasks = conf.getNumReduceTasks();
     this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
        (numMapTasks + numReduceTasks + 10);
-    JobContext jobContext = new JobContext(conf, jobId);
+    JobContext jobContext = new JobContextImpl(conf, jobId);
     this.jobSetupCleanupNeeded = jobContext.getJobSetupCleanupNeeded();
 
     this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Fri Sep 18 23:52:56 2009
@@ -198,7 +198,7 @@
     @Override
     public void run() {
       JobID jobId = profile.getJobID();
-      JobContext jContext = new JobContext(job, jobId);
+      JobContext jContext = new JobContextImpl(job, jobId);
       OutputCommitter outputCommitter = job.getOutputCommitter();
       try {
         // split input into minimum number of splits

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Fri Sep 18 23:52:56 2009
@@ -53,6 +53,8 @@
 import org.apache.hadoop.mapred.Merger.Segment;
 import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskCounter;
@@ -546,7 +548,8 @@
                              InterruptedException {
     // make a task context so we can get the classes
     org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
-      new org.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID());
+      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, 
+                                                                  getTaskID());
     // make a mapper
     org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
       (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
@@ -573,47 +576,33 @@
     
     job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
     org.apache.hadoop.mapreduce.RecordWriter output = null;
-    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
-         mapperContext = null;
-    try {
-      Constructor<org.apache.hadoop.mapreduce.Mapper.Context> contextConstructor =
-        org.apache.hadoop.mapreduce.Mapper.Context.class.getConstructor
-        (new Class[]{org.apache.hadoop.mapreduce.Mapper.class,
-                     Configuration.class,
-                     org.apache.hadoop.mapreduce.TaskAttemptID.class,
-                     org.apache.hadoop.mapreduce.RecordReader.class,
-                     org.apache.hadoop.mapreduce.RecordWriter.class,
-                     org.apache.hadoop.mapreduce.OutputCommitter.class,
-                     org.apache.hadoop.mapreduce.StatusReporter.class,
-                     org.apache.hadoop.mapreduce.InputSplit.class});
-
-      // get an output object
-      if (job.getNumReduceTasks() == 0) {
-        output = outputFormat.getRecordWriter(taskContext);
-      } else {
-        output = new NewOutputCollector(taskContext, job, umbilical, reporter);
-      }
+    
+    // get an output object
+    if (job.getNumReduceTasks() == 0) {
+      output = outputFormat.getRecordWriter(taskContext);
+    } else {
+      output = new NewOutputCollector(taskContext, job, umbilical, reporter);
+    }
 
-      mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(),
-                                                     input, output, committer,
-                                                     reporter, split);
+    org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
+    mapContext = 
+      new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 
+          input, output, 
+          committer, 
+          reporter, split);
 
-      input.initialize(split, mapperContext);
-      mapper.run(mapperContext);
-      mapPhase.complete();
-      setPhase(TaskStatus.Phase.SORT);
-      statusUpdate(umbilical);
-      input.close();
-      output.close(mapperContext);
-    } catch (NoSuchMethodException e) {
-      throw new IOException("Can't find Context constructor", e);
-    } catch (InstantiationException e) {
-      throw new IOException("Can't create Context", e);
-    } catch (InvocationTargetException e) {
-      throw new IOException("Can't invoke Context constructor", e);
-    } catch (IllegalAccessException e) {
-      throw new IOException("Can't invoke Context constructor", e);
-    }
+    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
+        mapperContext = 
+          new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
+              mapContext);
+
+    input.initialize(split, mapperContext);
+    mapper.run(mapperContext);
+    mapPhase.complete();
+    setPhase(TaskStatus.Phase.SORT);
+    statusUpdate(umbilical);
+    input.close();
+    output.close(mapperContext);
   }
 
   interface MapOutputCollector<K, V> {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Fri Sep 18 23:52:56 2009
@@ -516,7 +516,7 @@
     };
     // make a task context so we can get the classes
     org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
-      new org.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID());
+      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID());
     // make a reducer
     org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
       (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java Fri Sep 18 23:52:56 2009
@@ -47,6 +47,8 @@
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
+import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.Progress;
@@ -433,8 +435,8 @@
                          boolean useNewApi) throws IOException, 
                                                    ClassNotFoundException,
                                                    InterruptedException {
-    jobContext = new JobContext(job, id, reporter);
-    taskContext = new TaskAttemptContext(job, taskId, reporter);
+    jobContext = new JobContextImpl(job, id, reporter);
+    taskContext = new TaskAttemptContextImpl(job, taskId, reporter);
     if (getState() == TaskStatus.State.UNASSIGNED) {
       setState(TaskStatus.State.RUNNING);
     }
@@ -1060,29 +1062,6 @@
     }
   }
 
-  private static final Constructor<org.apache.hadoop.mapreduce.Reducer.Context> 
-  contextConstructor;
-  static {
-    try {
-      contextConstructor = 
-        org.apache.hadoop.mapreduce.Reducer.Context.class.getConstructor
-        (new Class[]{org.apache.hadoop.mapreduce.Reducer.class,
-            Configuration.class,
-            org.apache.hadoop.mapreduce.TaskAttemptID.class,
-            RawKeyValueIterator.class,
-            org.apache.hadoop.mapreduce.Counter.class,
-            org.apache.hadoop.mapreduce.Counter.class,
-            org.apache.hadoop.mapreduce.RecordWriter.class,
-            org.apache.hadoop.mapreduce.OutputCommitter.class,
-            org.apache.hadoop.mapreduce.StatusReporter.class,
-            RawComparator.class,
-            Class.class,
-            Class.class});
-    } catch (NoSuchMethodException nme) {
-      throw new IllegalArgumentException("Can't find constructor");
-    }
-  }
-
   @SuppressWarnings("unchecked")
   protected static <INKEY,INVALUE,OUTKEY,OUTVALUE> 
   org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
@@ -1098,21 +1077,26 @@
                       org.apache.hadoop.mapreduce.StatusReporter reporter,
                       RawComparator<INKEY> comparator,
                       Class<INKEY> keyClass, Class<INVALUE> valueClass
-  ) throws IOException, ClassNotFoundException {
-    try {
+  ) throws IOException, InterruptedException {
+    org.apache.hadoop.mapreduce.ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
+    reduceContext = 
+      new ReduceContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, taskId, 
+                                                              rIter, 
+                                                              inputKeyCounter, 
+                                                              inputValueCounter, 
+                                                              output, 
+                                                              committer, 
+                                                              reporter, 
+                                                              comparator, 
+                                                              keyClass, 
+                                                              valueClass);
+
+    org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
+        reducerContext = 
+          new WrappedReducer<INKEY, INVALUE, OUTKEY, OUTVALUE>().getReducerContext(
+              reduceContext);
 
-      return contextConstructor.newInstance(reducer, job, taskId,
-                                            rIter, inputKeyCounter, 
-                                            inputValueCounter, output, 
-                                            committer, reporter, comparator, 
-                                            keyClass, valueClass);
-    } catch (InstantiationException e) {
-      throw new IOException("Can't create Context", e);
-    } catch (InvocationTargetException e) {
-      throw new IOException("Can't invoke Context constructor", e);
-    } catch (IllegalAccessException e) {
-      throw new IOException("Can't invoke Context constructor", e);
-    }
+    return reducerContext;
   }
 
   protected static abstract class CombinerRunner<K,V> {
@@ -1154,7 +1138,7 @@
       }
       // make a task context so we can get the classes
       org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
-        new org.apache.hadoop.mapreduce.TaskAttemptContext(job, taskId);
+        new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, taskId);
       Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> newcls = 
         (Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>)
            taskContext.getCombinerClass();

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskAttemptContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskAttemptContext.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskAttemptContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskAttemptContext.java Fri Sep 18 23:52:56 2009
@@ -24,39 +24,12 @@
  *   instead.
  */
 @Deprecated
-public class TaskAttemptContext 
+public interface TaskAttemptContext 
        extends org.apache.hadoop.mapreduce.TaskAttemptContext {
-  private Progressable progress;
 
-  TaskAttemptContext(JobConf conf, TaskAttemptID taskid) {
-    this(conf, taskid, Reporter.NULL);
-  }
-  
-  TaskAttemptContext(JobConf conf, TaskAttemptID taskid,
-                     Progressable progress) {
-    super(conf, taskid);
-    this.progress = progress;
-  }
-  
-  /**
-   * Get the taskAttemptID.
-   *  
-   * @return TaskAttemptID
-   */
-  public TaskAttemptID getTaskAttemptID() {
-    return (TaskAttemptID) super.getTaskAttemptID();
-  }
-  
-  public Progressable getProgressible() {
-    return progress;
-  }
-  
-  public JobConf getJobConf() {
-    return (JobConf) getConfiguration();
-  }
+  public TaskAttemptID getTaskAttemptID();
 
-  @Override
-  public void progress() {
-    progress.progress();
-  }
+  public Progressable getProgressible();
+  
+  public JobConf getJobConf();
 }

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskAttemptContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskAttemptContextImpl.java?rev=816816&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskAttemptContextImpl.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskAttemptContextImpl.java Fri Sep 18 23:52:56 2009
@@ -0,0 +1,63 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl}
+ *   instead.
+ */
+@Deprecated
+public class TaskAttemptContextImpl
+       extends org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 
+       implements TaskAttemptContext {
+  private Progressable progress;
+
+  TaskAttemptContextImpl(JobConf conf, TaskAttemptID taskid) {
+    this(conf, taskid, Reporter.NULL);
+  }
+  
+  TaskAttemptContextImpl(JobConf conf, TaskAttemptID taskid,
+                         Progressable progress) {
+    super(conf, taskid);
+    this.progress = progress;
+  }
+  
+  /**
+   * Get the taskAttemptID.
+   *  
+   * @return TaskAttemptID
+   */
+  public TaskAttemptID getTaskAttemptID() {
+    return (TaskAttemptID) super.getTaskAttemptID();
+  }
+  
+  public Progressable getProgressible() {
+    return progress;
+  }
+  
+  public JobConf getJobConf() {
+    return (JobConf) getConfiguration();
+  }
+
+  @Override
+  public void progress() {
+    progress.progress();
+  }
+}

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java Fri Sep 18 23:52:56 2009
@@ -31,6 +31,7 @@
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.util.Progressable;
 
 /**
@@ -69,7 +70,7 @@
   public RecordWriter<K, V> getRecordWriter(FileSystem filesystem,
       JobConf job, String name, Progressable progress) throws IOException {
     org.apache.hadoop.mapreduce.RecordWriter<K, V> w = super.getRecordWriter(
-      new TaskAttemptContext(job, 
+      new TaskAttemptContextImpl(job, 
             TaskAttemptID.forName(job.get(JobContext.TASK_ATTEMPT_ID))));
     org.apache.hadoop.mapreduce.lib.db.DBOutputFormat.DBRecordWriter writer = 
      (org.apache.hadoop.mapreduce.lib.db.DBOutputFormat.DBRecordWriter) w;

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java Fri Sep 18 23:52:56 2009
@@ -47,6 +47,7 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.mapreduce.util.ConfigUtil;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
@@ -84,8 +85,9 @@
  * 
  * 
  */
-public class Job extends JobContext {  
+public class Job extends JobContextImpl implements JobContext {  
   private static final Log LOG = LogFactory.getLog(Job.class);
+
   public static enum JobState {DEFINE, RUNNING};
   private static final long MAX_JOBSTATUS_AGE = 1000 * 2;
   public static final String OUTPUT_FILTER = "mapreduce.client.output.filter";
@@ -544,7 +546,8 @@
   public void setInputFormatClass(Class<? extends InputFormat> cls
                                   ) throws IllegalStateException {
     ensureState(JobState.DEFINE);
-    conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, InputFormat.class);
+    conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, 
+                  InputFormat.class);
   }
 
   /**
@@ -555,7 +558,8 @@
   public void setOutputFormatClass(Class<? extends OutputFormat> cls
                                    ) throws IllegalStateException {
     ensureState(JobState.DEFINE);
-    conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, OutputFormat.class);
+    conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, 
+                  OutputFormat.class);
   }
 
   /**
@@ -626,7 +630,8 @@
   public void setPartitionerClass(Class<? extends Partitioner> cls
                                   ) throws IllegalStateException {
     ensureState(JobState.DEFINE);
-    conf.setClass(PARTITIONER_CLASS_ATTR, cls, Partitioner.class);
+    conf.setClass(PARTITIONER_CLASS_ATTR, cls, 
+                  Partitioner.class);
   }
 
   /**
@@ -919,12 +924,12 @@
       }      
     } else {
       String mode = "map compatability";
-      ensureNotSet(JobContext.INPUT_FORMAT_CLASS_ATTR, mode);
-      ensureNotSet(JobContext.MAP_CLASS_ATTR, mode);
+      ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode);
+      ensureNotSet(MAP_CLASS_ATTR, mode);
       if (numReduces != 0) {
-        ensureNotSet(JobContext.PARTITIONER_CLASS_ATTR, mode);
+        ensureNotSet(PARTITIONER_CLASS_ATTR, mode);
        } else {
-        ensureNotSet(JobContext.OUTPUT_FORMAT_CLASS_ATTR, mode);
+        ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
       }
     }
     if (numReduces != 0) {
@@ -936,8 +941,8 @@
         ensureNotSet(oldReduceClass, mode);   
       } else {
         String mode = "reduce compatability";
-        ensureNotSet(JobContext.OUTPUT_FORMAT_CLASS_ATTR, mode);
-        ensureNotSet(JobContext.REDUCE_CLASS_ATTR, mode);   
+        ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
+        ensureNotSet(REDUCE_CLASS_ATTR, mode);   
       }
     }   
   }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java Fri Sep 18 23:52:56 2009
@@ -26,16 +26,12 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 
 /**
  * A read-only view of the job that is provided to the tasks while they
  * are running.
  */
-public class JobContext {
+public interface JobContext {
   // Put all of the attribute names in here so that Job and JobContext are
   // consistent.
   public static final String INPUT_FORMAT_CLASS_ATTR = 
@@ -221,63 +217,43 @@
   public static final String REDUCE_MEMTOMEM_ENABLED = 
     "mapreduce.reduce.merge.memtomem.enabled";
 
-  protected final org.apache.hadoop.mapred.JobConf conf;
-  private final JobID jobId;
-  
-  public JobContext(Configuration conf, JobID jobId) {
-    this.conf = new org.apache.hadoop.mapred.JobConf(conf);
-    this.jobId = jobId;
-  }
-
   /**
    * Return the configuration for the job.
    * @return the shared configuration object
    */
-  public Configuration getConfiguration() {
-    return conf;
-  }
+  public Configuration getConfiguration();
 
   /**
    * Get the unique ID for the job.
    * @return the object with the job id
    */
-  public JobID getJobID() {
-    return jobId;
-  }
+  public JobID getJobID();
   
   /**
    * Get configured the number of reduce tasks for this job. Defaults to 
    * <code>1</code>.
    * @return the number of reduce tasks for this job.
    */
-  public int getNumReduceTasks() {
-    return conf.getNumReduceTasks();
-  }
+  public int getNumReduceTasks();
   
   /**
    * Get the current working directory for the default file system.
    * 
    * @return the directory name.
    */
-  public Path getWorkingDirectory() throws IOException {
-    return conf.getWorkingDirectory();
-  }
+  public Path getWorkingDirectory() throws IOException;
 
   /**
    * Get the key class for the job output data.
    * @return the key class for the job output data.
    */
-  public Class<?> getOutputKeyClass() {
-    return conf.getOutputKeyClass();
-  }
+  public Class<?> getOutputKeyClass();
   
   /**
    * Get the value class for job outputs.
    * @return the value class for job outputs.
    */
-  public Class<?> getOutputValueClass() {
-    return conf.getOutputValueClass();
-  }
+  public Class<?> getOutputValueClass();
 
   /**
    * Get the key class for the map output data. If it is not set, use the
@@ -285,9 +261,7 @@
    * different than the final output key class.
    * @return the map output key class.
    */
-  public Class<?> getMapOutputKeyClass() {
-    return conf.getMapOutputKeyClass();
-  }
+  public Class<?> getMapOutputKeyClass();
 
   /**
    * Get the value class for the map output data. If it is not set, use the
@@ -296,9 +270,7 @@
    *  
    * @return the map output value class.
    */
-  public Class<?> getMapOutputValueClass() {
-    return conf.getMapOutputValueClass();
-  }
+  public Class<?> getMapOutputValueClass();
 
   /**
    * Get the user-specified job name. This is only used to identify the 
@@ -306,98 +278,68 @@
    * 
    * @return the job's name, defaulting to "".
    */
-  public String getJobName() {
-    return conf.getJobName();
-  }
+  public String getJobName();
 
   /**
    * Get the {@link InputFormat} class for the job.
    * 
    * @return the {@link InputFormat} class for the job.
    */
-  @SuppressWarnings("unchecked")
   public Class<? extends InputFormat<?,?>> getInputFormatClass() 
-     throws ClassNotFoundException {
-    return (Class<? extends InputFormat<?,?>>) 
-      conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
-  }
+     throws ClassNotFoundException;
 
   /**
    * Get the {@link Mapper} class for the job.
    * 
    * @return the {@link Mapper} class for the job.
    */
-  @SuppressWarnings("unchecked")
   public Class<? extends Mapper<?,?,?,?>> getMapperClass() 
-     throws ClassNotFoundException {
-    return (Class<? extends Mapper<?,?,?,?>>) 
-      conf.getClass(MAP_CLASS_ATTR, Mapper.class);
-  }
+     throws ClassNotFoundException;
 
   /**
    * Get the combiner class for the job.
    * 
    * @return the combiner class for the job.
    */
-  @SuppressWarnings("unchecked")
   public Class<? extends Reducer<?,?,?,?>> getCombinerClass() 
-     throws ClassNotFoundException {
-    return (Class<? extends Reducer<?,?,?,?>>) 
-      conf.getClass(COMBINE_CLASS_ATTR, null);
-  }
+     throws ClassNotFoundException;
 
   /**
    * Get the {@link Reducer} class for the job.
    * 
    * @return the {@link Reducer} class for the job.
    */
-  @SuppressWarnings("unchecked")
   public Class<? extends Reducer<?,?,?,?>> getReducerClass() 
-     throws ClassNotFoundException {
-    return (Class<? extends Reducer<?,?,?,?>>) 
-      conf.getClass(REDUCE_CLASS_ATTR, Reducer.class);
-  }
+     throws ClassNotFoundException;
 
   /**
    * Get the {@link OutputFormat} class for the job.
    * 
    * @return the {@link OutputFormat} class for the job.
    */
-  @SuppressWarnings("unchecked")
   public Class<? extends OutputFormat<?,?>> getOutputFormatClass() 
-     throws ClassNotFoundException {
-    return (Class<? extends OutputFormat<?,?>>) 
-      conf.getClass(OUTPUT_FORMAT_CLASS_ATTR, TextOutputFormat.class);
-  }
+     throws ClassNotFoundException;
 
   /**
    * Get the {@link Partitioner} class for the job.
    * 
    * @return the {@link Partitioner} class for the job.
    */
-  @SuppressWarnings("unchecked")
   public Class<? extends Partitioner<?,?>> getPartitionerClass() 
-     throws ClassNotFoundException {
-    return (Class<? extends Partitioner<?,?>>) 
-      conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
-  }
+     throws ClassNotFoundException;
 
   /**
    * Get the {@link RawComparator} comparator used to compare keys.
    * 
    * @return the {@link RawComparator} comparator used to compare keys.
    */
-  public RawComparator<?> getSortComparator() {
-    return conf.getOutputKeyComparator();
-  }
+  public RawComparator<?> getSortComparator();
 
   /**
    * Get the pathname of the job's jar.
    * @return the pathname
    */
-  public String getJar() {
-    return conf.getJar();
-  }
+  public String getJar();
 
   /** 
    * Get the user defined {@link RawComparator} comparator for 
@@ -406,26 +348,20 @@
    * @return comparator set by the user for grouping values.
    * @see Job#setGroupingComparatorClass(Class) for details.  
    */
-  public RawComparator<?> getGroupingComparator() {
-    return conf.getOutputValueGroupingComparator();
-  }
+  public RawComparator<?> getGroupingComparator();
   
   /**
    * Get whether job-setup and job-cleanup is needed for the job 
    * 
    * @return boolean 
    */
-  public boolean getJobSetupCleanupNeeded() {
-    return conf.getBoolean(SETUP_CLEANUP_NEEDED, true);
-  }
+  public boolean getJobSetupCleanupNeeded();
 
   /**
    * Get whether the task profiling is enabled.
    * @return true if some tasks will be profiled
    */
-  public boolean getProfileEnabled() {
-    return conf.getProfileEnabled();
-  }
+  public boolean getProfileEnabled();
 
   /**
    * Get the profiler configuration arguments.
@@ -435,52 +371,40 @@
    * 
    * @return the parameters to pass to the task child to configure profiling
    */
-  public String getProfileParams() {
-    return conf.getProfileParams();
-  }
+  public String getProfileParams();
 
   /**
    * Get the range of maps or reduces to profile.
    * @param isMap is the task a map?
    * @return the task ranges
    */
-  public IntegerRanges getProfileTaskRange(boolean isMap) {
-    return conf.getProfileTaskRange(isMap);
-  }
+  public IntegerRanges getProfileTaskRange(boolean isMap);
 
   /**
    * Get the reported username for this job.
    * 
    * @return the username
    */
-  public String getUser() {
-    return conf.getUser();
-  }
+  public String getUser();
   
   /**
    * This method checks to see if symlinks are to be create for the 
    * localized cache files in the current working directory 
    * @return true if symlinks are to be created- else return false
    */
-  public boolean getSymlink() {
-    return DistributedCache.getSymlink(conf);
-  }
+  public boolean getSymlink();
   
   /**
    * Get the archive entries in classpath as an array of Path
    */
-  public Path[] getArchiveClassPaths() {
-    return DistributedCache.getArchiveClassPaths(conf);
-  }
+  public Path[] getArchiveClassPaths();
 
   /**
    * Get cache archives set in the Configuration
    * @return A URI array of the caches set in the Configuration
    * @throws IOException
    */
-  public URI[] getCacheArchives() throws IOException {
-    return DistributedCache.getCacheArchives(conf);
-  }
+  public URI[] getCacheArchives() throws IOException;
 
   /**
    * Get cache files set in the Configuration
@@ -488,36 +412,26 @@
    * @throws IOException
    */
 
-  public URI[] getCacheFiles() throws IOException {
-    return DistributedCache.getCacheFiles(conf);
-  }
+  public URI[] getCacheFiles() throws IOException;
 
   /**
    * Return the path array of the localized caches
    * @return A path array of localized caches
    * @throws IOException
    */
-  public Path[] getLocalCacheArchives()
-    throws IOException {
-    return DistributedCache.getLocalCacheArchives(conf);
-  }
+  public Path[] getLocalCacheArchives() throws IOException;
 
   /**
    * Return the path array of the localized files
    * @return A path array of localized files
    * @throws IOException
    */
-  public Path[] getLocalCacheFiles()
-    throws IOException {
-    return DistributedCache.getLocalCacheFiles(conf);
-  }
+  public Path[] getLocalCacheFiles() throws IOException;
 
   /**
    * Get the file entries in classpath as an array of Path
    */
-  public Path[] getFileClassPaths() {
-    return DistributedCache.getFileClassPaths(conf);
-  }
+  public Path[] getFileClassPaths();
   
   /**
    * Get the timestamps of the archives.  Used by internal
@@ -525,9 +439,7 @@
    * @return a string array of timestamps 
    * @throws IOException
    */
-  public String[] getArchiveTimestamps() {
-    return DistributedCache.getArchiveTimestamps(conf);
-  }
+  public String[] getArchiveTimestamps();
 
   /**
    * Get the timestamps of the files.  Used by internal
@@ -535,9 +447,7 @@
    * @return a string array of timestamps 
    * @throws IOException
    */
-  public String[] getFileTimestamps() {
-    return DistributedCache.getFileTimestamps(conf);
-  }
+  public String[] getFileTimestamps();
 
   /** 
    * Get the configured number of maximum attempts that will be made to run a
@@ -546,9 +456,7 @@
    *  
    * @return the max number of attempts per map task.
    */
-  public int getMaxMapAttempts() {
-    return conf.getMaxMapAttempts();
-  }
+  public int getMaxMapAttempts();
 
   /** 
    * Get the configured number of maximum attempts  that will be made to run a
@@ -557,8 +465,6 @@
    * 
    * @return the max number of attempts per reduce task.
    */
-  public int getMaxReduceAttempts() {
-    return conf.getMaxReduceAttempts();
-  }
+  public int getMaxReduceAttempts();
 
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MapContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MapContext.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MapContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/MapContext.java Fri Sep 18 23:52:56 2009
@@ -18,10 +18,6 @@
 
 package org.apache.hadoop.mapreduce;
 
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-
 /**
  * The context that is given to the {@link Mapper}.
  * @param <KEYIN> the key input type to the Mapper
@@ -29,43 +25,13 @@
  * @param <KEYOUT> the key output type from the Mapper
  * @param <VALUEOUT> the value output type from the Mapper
  */
-public class MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
+public interface MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
   extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
-  private RecordReader<KEYIN,VALUEIN> reader;
-  private InputSplit split;
-
-  public MapContext(Configuration conf, TaskAttemptID taskid,
-                    RecordReader<KEYIN,VALUEIN> reader,
-                    RecordWriter<KEYOUT,VALUEOUT> writer,
-                    OutputCommitter committer,
-                    StatusReporter reporter,
-                    InputSplit split) {
-    super(conf, taskid, writer, committer, reporter);
-    this.reader = reader;
-    this.split = split;
-  }
 
   /**
    * Get the input split for this map.
    */
-  public InputSplit getInputSplit() {
-    return split;
-  }
-
-  @Override
-  public KEYIN getCurrentKey() throws IOException, InterruptedException {
-    return reader.getCurrentKey();
-  }
-
-  @Override
-  public VALUEIN getCurrentValue() throws IOException, InterruptedException {
-    return reader.getCurrentValue();
-  }
-
-  @Override
-  public boolean nextKeyValue() throws IOException, InterruptedException {
-    return reader.nextKeyValue();
-  }
-
+  public InputSplit getInputSplit();
+  
 }
      
\ No newline at end of file

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Mapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Mapper.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Mapper.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Mapper.java Fri Sep 18 23:52:56 2009
@@ -23,6 +23,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
 
 /** 
  * Maps input key/value pairs to a set of intermediate key/value pairs.  
@@ -94,16 +95,11 @@
  */
 public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
 
-  public class Context 
-    extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
-    public Context(Configuration conf, TaskAttemptID taskid,
-                   RecordReader<KEYIN,VALUEIN> reader,
-                   RecordWriter<KEYOUT,VALUEOUT> writer,
-                   OutputCommitter committer,
-                   StatusReporter reporter,
-                   InputSplit split) throws IOException, InterruptedException {
-      super(conf, taskid, reader, writer, committer, reporter, split);
-    }
+  /**
+   * The <code>Context</code> passed on to the {@link Mapper} implementations.
+   */
+  public abstract class Context
+    implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
   }
   
   /**

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ReduceContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ReduceContext.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ReduceContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ReduceContext.java Fri Sep 18 23:52:56 2009
@@ -18,22 +18,8 @@
 
 package org.apache.hadoop.mapreduce;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.mapred.BackupStore;
-import org.apache.hadoop.mapred.RawKeyValueIterator;
-import org.apache.hadoop.util.Progressable;
 
 /**
  * The context passed to the {@link Reducer}.
@@ -42,311 +28,30 @@
  * @param <KEYOUT> the class of the output keys
  * @param <VALUEOUT> the class of the output values
  */
-public class ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
+public interface ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
     extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
-  private RawKeyValueIterator input;
-  private Counter inputValueCounter;
-  private Counter inputKeyCounter;
-  private RawComparator<KEYIN> comparator;
-  private KEYIN key;                                  // current key
-  private VALUEIN value;                              // current value
-  private boolean firstValue = false;                 // first value in key
-  private boolean nextKeyIsSame = false;              // more w/ this key
-  private boolean hasMore;                            // more in file
-  protected Progressable reporter;
-  private Deserializer<KEYIN> keyDeserializer;
-  private Deserializer<VALUEIN> valueDeserializer;
-  private DataInputBuffer buffer = new DataInputBuffer();
-  private BytesWritable currentRawKey = new BytesWritable();
-  private ValueIterable iterable = new ValueIterable();
-  private boolean isMarked = false;
-  private BackupStore<KEYIN,VALUEIN> backupStore;
-  private final SerializationFactory serializationFactory;
-  private final Class<KEYIN> keyClass;
-  private final Class<VALUEIN> valueClass;
-  private final Configuration conf;
-  private final TaskAttemptID taskid;
-  private int currentKeyLength = -1;
-  private int currentValueLength = -1;
-  
-  public ReduceContext(Configuration conf, TaskAttemptID taskid,
-                       RawKeyValueIterator input, 
-                       Counter inputKeyCounter,
-                       Counter inputValueCounter,
-                       RecordWriter<KEYOUT,VALUEOUT> output,
-                       OutputCommitter committer,
-                       StatusReporter reporter,
-                       RawComparator<KEYIN> comparator,
-                       Class<KEYIN> keyClass,
-                       Class<VALUEIN> valueClass
-                       ) throws InterruptedException, IOException{
-    super(conf, taskid, output, committer, reporter);
-    this.input = input;
-    this.inputKeyCounter = inputKeyCounter;
-    this.inputValueCounter = inputValueCounter;
-    this.comparator = comparator;
-    this.serializationFactory = new SerializationFactory(conf);
-    this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
-    this.keyDeserializer.open(buffer);
-    this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
-    this.valueDeserializer.open(buffer);
-    hasMore = input.next();
-    this.keyClass = keyClass;
-    this.valueClass = valueClass;
-    this.conf = conf;
-    this.taskid = taskid;
-  }
 
   /** Start processing next unique key. */
-  public boolean nextKey() throws IOException,InterruptedException {
-    while (hasMore && nextKeyIsSame) {
-      nextKeyValue();
-    }
-    if (hasMore) {
-      if (inputKeyCounter != null) {
-        inputKeyCounter.increment(1);
-      }
-      return nextKeyValue();
-    } else {
-      return false;
-    }
-  }
+  public boolean nextKey() throws IOException,InterruptedException;
 
   /**
-   * Advance to the next key/value pair.
+   * Iterate through the values for the current key, reusing the same value 
+   * object, which is stored in the context.
+   * @return the series of values associated with the current key. All of the 
+   * objects returned directly and indirectly from this method are reused.
    */
-  @Override
-  public boolean nextKeyValue() throws IOException, InterruptedException {
-    if (!hasMore) {
-      key = null;
-      value = null;
-      return false;
-    }
-    firstValue = !nextKeyIsSame;
-    DataInputBuffer nextKey = input.getKey();
-    currentRawKey.set(nextKey.getData(), nextKey.getPosition(), 
-                      nextKey.getLength() - nextKey.getPosition());
-    buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
-    key = keyDeserializer.deserialize(key);
-    DataInputBuffer nextVal = input.getValue();
-    buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength());
-    value = valueDeserializer.deserialize(value);
-
-    currentKeyLength = nextKey.getLength() - nextKey.getPosition();
-    currentValueLength = nextVal.getLength() - nextVal.getPosition();
-
-    if (isMarked) {
-      backupStore.write(nextKey, nextVal);
-    }
-
-    hasMore = input.next();
-    if (hasMore) {
-      nextKey = input.getKey();
-      nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 
-                                     currentRawKey.getLength(),
-                                     nextKey.getData(),
-                                     nextKey.getPosition(),
-                                     nextKey.getLength() - nextKey.getPosition()
-                                         ) == 0;
-    } else {
-      nextKeyIsSame = false;
-    }
-    inputValueCounter.increment(1);
-    return true;
-  }
-
-  public KEYIN getCurrentKey() {
-    return key;
-  }
-
-  @Override
-  public VALUEIN getCurrentValue() {
-    return value;
-  }
-  
-
-  
-  protected class ValueIterator implements MarkableIteratorInterface<VALUEIN> {
-
-    private boolean inReset = false;
-    private boolean clearMarkFlag = false;
-    
-    @Override
-    public boolean hasNext() {
-      try {
-        if (inReset && backupStore.hasNext()) {
-          return true;
-        } 
-      } catch (Exception e) {
-        e.printStackTrace();
-        throw new RuntimeException("hasNext failed", e);
-      }
-      return firstValue || nextKeyIsSame;
-    }
-
-    @Override
-    public VALUEIN next() {
-      if (inReset) {
-        try {
-          if (backupStore.hasNext()) {
-            backupStore.next();
-            DataInputBuffer next = backupStore.nextValue();
-            buffer.reset(next.getData(), next.getPosition(), next.getLength());
-            value = valueDeserializer.deserialize(value);
-            return value;
-          } else {
-            inReset = false;
-            backupStore.exitResetMode();
-            if (clearMarkFlag) {
-              clearMarkFlag = false;
-              isMarked = false;
-            }
-          }
-        } catch (IOException e) {
-          e.printStackTrace();
-          throw new RuntimeException("next value iterator failed", e);
-        }
-      } 
-
-      // if this is the first record, we don't need to advance
-      if (firstValue) {
-        firstValue = false;
-        return value;
-      }
-      // if this isn't the first record and the next key is different, they
-      // can't advance it here.
-      if (!nextKeyIsSame) {
-        throw new NoSuchElementException("iterate past last value");
-      }
-      // otherwise, go to the next key/value pair
-      try {
-        nextKeyValue();
-        return value;
-      } catch (IOException ie) {
-        throw new RuntimeException("next value iterator failed", ie);
-      } catch (InterruptedException ie) {
-        // this is bad, but we can't modify the exception list of java.util
-        throw new RuntimeException("next value iterator interrupted", ie);        
-      }
-    }
+  public Iterable<VALUEIN> getValues() throws IOException, InterruptedException;
 
-    @Override
-    public void remove() {
-      throw new UnsupportedOperationException("remove not implemented");
-    }
-
-    @Override
-    public void mark() throws IOException {
-      if (backupStore == null) {
-        backupStore = new BackupStore<KEYIN,VALUEIN>(conf, taskid);
-      }
-      isMarked = true;
-      if (!inReset) {
-        backupStore.reinitialize();
-        if (currentKeyLength == -1) {
-          // The user has not called next() for this iterator yet, so
-          // there is no current record to mark and copy to backup store.
-          return;
-        }
-        assert (currentValueLength != -1);
-        int requestedSize = currentKeyLength + currentValueLength + 
-          WritableUtils.getVIntSize(currentKeyLength) +
-          WritableUtils.getVIntSize(currentValueLength);
-        DataOutputStream out = backupStore.getOutputStream(requestedSize);
-        writeFirstKeyValueBytes(out);
-        backupStore.updateCounters(requestedSize);
-      } else {
-        backupStore.mark();
-      }
-    }
-
-    @Override
-    public void reset() throws IOException {
-      // We reached the end of an iteration and user calls a 
-      // reset, but a clearMark was called before, just throw
-      // an exception
-      if (clearMarkFlag) {
-        clearMarkFlag = false;
-        backupStore.clearMark();
-        throw new IOException("Reset called without a previous mark");
-      }
-      
-      if (!isMarked) {
-        throw new IOException("Reset called without a previous mark");
-      }
-      inReset = true;
-      backupStore.reset();
-    }
+  /**
+   * {@link Iterator} to iterate over values for a given group of records.
+   */
+  interface ValueIterator<VALUEIN> extends MarkableIteratorInterface<VALUEIN> {
 
-    @Override
-    public void clearMark() throws IOException {
-      if (backupStore == null) {
-        return;
-      }
-      if (inReset) {
-        clearMarkFlag = true;
-        backupStore.clearMark();
-      } else {
-        inReset = isMarked = false;
-        backupStore.reinitialize();
-      }
-    }
-	  
     /**
      * This method is called when the reducer moves from one key to 
      * another.
      * @throws IOException
      */
-    void resetBackupStore() throws IOException {
-      if (backupStore == null) {
-        return;
-      }
-      inReset = isMarked = false;
-      backupStore.reinitialize();
-      currentKeyLength = -1;
-    }
-
-    /**
-     * This method is called to write the record that was most recently
-     * served (before a call to the mark). Since the framework reads one
-     * record in advance, to get this record, we serialize the current key
-     * and value
-     * @param out
-     * @throws IOException
-     */
-    private void writeFirstKeyValueBytes(DataOutputStream out) 
-    throws IOException {
-      assert (getCurrentKey() != null && getCurrentValue() != null);
-      WritableUtils.writeVInt(out, currentKeyLength);
-      WritableUtils.writeVInt(out, currentValueLength);
-      Serializer<KEYIN> keySerializer = 
-        serializationFactory.getSerializer(keyClass);
-      keySerializer.open(out);
-      keySerializer.serialize(getCurrentKey());
-
-      Serializer<VALUEIN> valueSerializer = 
-        serializationFactory.getSerializer(valueClass);
-      valueSerializer.open(out);
-      valueSerializer.serialize(getCurrentValue());
-    }
-  }
-
-  protected class ValueIterable implements Iterable<VALUEIN> {
-    private ValueIterator iterator = new ValueIterator();
-    @Override
-    public Iterator<VALUEIN> iterator() {
-      return iterator;
-    } 
-  }
-  
-  /**
-   * Iterate through the values for the current key, reusing the same value 
-   * object, which is stored in the context.
-   * @return the series of values associated with the current key. All of the 
-   * objects returned directly and indirectly from this method are reused.
-   */
-  public 
-  Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
-    return iterable;
+    void resetBackupStore() throws IOException;
   }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Reducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Reducer.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Reducer.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Reducer.java Fri Sep 18 23:52:56 2009
@@ -117,23 +117,11 @@
  */
 public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
 
-  public class Context 
-    extends ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
-    public Context(Configuration conf, TaskAttemptID taskid,
-                   RawKeyValueIterator input, 
-                   Counter inputKeyCounter,
-                   Counter inputValueCounter,
-                   RecordWriter<KEYOUT,VALUEOUT> output,
-                   OutputCommitter committer,
-                   StatusReporter reporter,
-                   RawComparator<KEYIN> comparator,
-                   Class<KEYIN> keyClass,
-                   Class<VALUEIN> valueClass
-                   ) throws IOException, InterruptedException {
-      super(conf, taskid, input, inputKeyCounter, inputValueCounter, 
-          output, committer, reporter, 
-          comparator, keyClass, valueClass);
-    }
+  /**
+   * The <code>Context</code> passed on to the {@link Reducer} implementations.
+   */
+  public abstract class Context 
+    implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
   }
 
   /**

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java Fri Sep 18 23:52:56 2009
@@ -18,49 +18,27 @@
 
 package org.apache.hadoop.mapreduce;
 
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.Progressable;
 
 /**
  * The context for task attempts.
  */
-public class TaskAttemptContext extends JobContext implements Progressable {
-  private final TaskAttemptID taskId;
-  private String status = "";
-  
-  public TaskAttemptContext(Configuration conf, 
-                            TaskAttemptID taskId) {
-    super(conf, taskId.getJobID());
-    this.taskId = taskId;
-  }
+public interface TaskAttemptContext extends JobContext, Progressable {
 
   /**
    * Get the unique name for this task attempt.
    */
-  public TaskAttemptID getTaskAttemptID() {
-    return taskId;
-  }
+  public TaskAttemptID getTaskAttemptID();
 
   /**
    * Set the current status of the task to the given string.
    */
-  public void setStatus(String msg) throws IOException {
-    status = msg;
-  }
+  public void setStatus(String msg);
 
   /**
    * Get the last set status message.
    * @return the current status message
    */
-  public String getStatus() {
-    return status;
-  }
+  public String getStatus();
 
-  /**
-   * Report progress. The subtypes actually do work in this method.
-   */
-  public void progress() { 
-  }
 }
\ No newline at end of file

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskInputOutputContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskInputOutputContext.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskInputOutputContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskInputOutputContext.java Fri Sep 18 23:52:56 2009
@@ -20,9 +20,6 @@
 
 import java.io.IOException;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.Progressable;
-
 /**
  * A context object that allows input and output from the task. It is only
  * supplied to the {@link Mapper} or {@link Reducer}.
@@ -31,28 +28,14 @@
  * @param <KEYOUT> the output key type for the task
  * @param <VALUEOUT> the output value type for the task
  */
-public abstract class TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
-       extends TaskAttemptContext implements Progressable {
-  private RecordWriter<KEYOUT,VALUEOUT> output;
-  private StatusReporter reporter;
-  private OutputCommitter committer;
-
-  public TaskInputOutputContext(Configuration conf, TaskAttemptID taskid,
-                                RecordWriter<KEYOUT,VALUEOUT> output,
-                                OutputCommitter committer,
-                                StatusReporter reporter) {
-    super(conf, taskid);
-    this.output = output;
-    this.reporter = reporter;
-    this.committer = committer;
-  }
+public interface TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
+       extends TaskAttemptContext {
 
   /**
    * Advance to the next key, value pair, returning null if at end.
    * @return the key object that was read into, or null if no more
    */
-  public abstract 
-  boolean nextKeyValue() throws IOException, InterruptedException;
+  public boolean nextKeyValue() throws IOException, InterruptedException;
  
   /**
    * Get the current key.
@@ -60,8 +43,7 @@
    * @throws IOException
    * @throws InterruptedException
    */
-  public abstract 
-  KEYIN getCurrentKey() throws IOException, InterruptedException;
+  public KEYIN getCurrentKey() throws IOException, InterruptedException;
 
   /**
    * Get the current value.
@@ -69,36 +51,33 @@
    * @throws IOException
    * @throws InterruptedException
    */
-  public abstract VALUEIN getCurrentValue() throws IOException, 
-                                                   InterruptedException;
+  public VALUEIN getCurrentValue() throws IOException, InterruptedException;
 
   /**
    * Generate an output key/value pair.
    */
-  public void write(KEYOUT key, VALUEOUT value
-                    ) throws IOException, InterruptedException {
-    output.write(key, value);
-  }
-
-  public Counter getCounter(Enum<?> counterName) {
-    return reporter.getCounter(counterName);
-  }
-
-  public Counter getCounter(String groupName, String counterName) {
-    return reporter.getCounter(groupName, counterName);
-  }
-
-  @Override
-  public void progress() {
-    reporter.progress();
-  }
-
-  @Override
-  public void setStatus(String status) {
-    reporter.setStatus(status);
-  }
-  
-  public OutputCommitter getOutputCommitter() {
-    return committer;
-  }
+  public void write(KEYOUT key, VALUEOUT value) 
+      throws IOException, InterruptedException;
+
+  /**
+   * Get the {@link Counter} for the given <code>counterName</code>.
+   * @param counterName counter name
+   * @return the <code>Counter</code> for the given <code>counterName</code>
+   */
+  public Counter getCounter(Enum<?> counterName);
+
+  /**
+   * Get the {@link Counter} for the given <code>groupName</code> and 
+   * <code>counterName</code>.
+   * @param counterName counter name
+   * @return the <code>Counter</code> for the given <code>groupName</code> and 
+   *         <code>counterName</code>
+   */
+  public Counter getCounter(String groupName, String counterName);
+
+  /**
+   * Get the {@link OutputCommitter} for the task-attempt.
+   * @return the <code>OutputCommitter</code> for the task-attempt
+   */
+  public OutputCommitter getOutputCommitter();
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/Parser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/Parser.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/Parser.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/Parser.java Fri Sep 18 23:52:56 2009
@@ -42,6 +42,8 @@
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /**
@@ -309,8 +311,9 @@
     
     public List<InputSplit> getSplits(JobContext context)
         throws IOException, InterruptedException {
-      return inf.getSplits(new JobContext(
-        getConf(context.getConfiguration()), context.getJobID()));
+      return inf.getSplits(
+                 new JobContextImpl(getConf(context.getConfiguration()), 
+                                    context.getJobID()));
     }
 
     public ComposableRecordReader<?, ?> createRecordReader(InputSplit split, 
@@ -321,8 +324,10 @@
           throw new IOException("No RecordReader for " + ident);
         }
         Configuration conf = getConf(taskContext.getConfiguration());
-        TaskAttemptContext context = new TaskAttemptContext(conf, 
-          TaskAttemptID.forName(conf.get(JobContext.TASK_ATTEMPT_ID)));
+        TaskAttemptContext context = 
+          new TaskAttemptContextImpl(conf, 
+                                     TaskAttemptID.forName(
+                                         conf.get(JobContext.TASK_ATTEMPT_ID)));
         return rrCstrMap.get(ident).newInstance(id,
             inf.createRecordReader(split, context), cmpcl);
       } catch (IllegalAccessException e) {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java Fri Sep 18 23:52:56 2009
@@ -24,11 +24,13 @@
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MapContext;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.StatusReporter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -244,13 +246,15 @@
     MapRunner(Context context) throws IOException, InterruptedException {
       mapper = ReflectionUtils.newInstance(mapClass, 
                                            context.getConfiguration());
-      subcontext = new Context(outer.getConfiguration(), 
-                            outer.getTaskAttemptID(),
-                            new SubMapRecordReader(),
-                            new SubMapRecordWriter(), 
-                            context.getOutputCommitter(),
-                            new SubMapStatusReporter(),
-                            outer.getInputSplit());
+      MapContext<K1, V1, K2, V2> mapContext = 
+        new MapContextImpl<K1, V1, K2, V2>(outer.getConfiguration(), 
+                                           outer.getTaskAttemptID(),
+                                           new SubMapRecordReader(),
+                                           new SubMapRecordWriter(), 
+                                           context.getOutputCommitter(),
+                                           new SubMapStatusReporter(),
+                                           outer.getInputSplit());
+      subcontext = new WrappedMapper<K1, V1, K2, V2>().getMapContext(mapContext);
     }
 
     public Throwable getThrowable() {



Mime
View raw message