hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r816816 [2/2] - in /hadoop/mapreduce/trunk: ./ src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/ src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/ src/contrib/vertica/src/test/org/apache/hadoop/vertica/ src/...
Date Fri, 18 Sep 2009 23:52:58 GMT
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java?rev=816816&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java Fri Sep 18 23:52:56 2009
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.map;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+/**
+ * A {@link Mapper} which wraps a given one to allow custom 
+ * {@link Mapper.Context} implementations.
+ */
+public class WrappedMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
+    extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+  
+  /**
+   * Get a wrapped {@link Mapper.Context} for custom implementations.
+   * @param mapContext <code>MapContext</code> to be wrapped
+   * @return a wrapped <code>Mapper.Context</code> for custom implementations
+   */
+  public Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context
+  getMapContext(MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext) {
+    return new Context(mapContext);
+  }
+  
+  public class Context 
+      extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context {
+
+    protected MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext;
+
+    public Context(MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext) {
+      this.mapContext = mapContext;
+    }
+
+    /**
+     * Get the input split for this map.
+     */
+    public InputSplit getInputSplit() {
+      return mapContext.getInputSplit();
+    }
+
+    @Override
+    public KEYIN getCurrentKey() throws IOException, InterruptedException {
+      return mapContext.getCurrentKey();
+    }
+
+    @Override
+    public VALUEIN getCurrentValue() throws IOException, InterruptedException {
+      return mapContext.getCurrentValue();
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      return mapContext.nextKeyValue();
+    }
+
+    @Override
+    public Counter getCounter(Enum<?> counterName) {
+      return mapContext.getCounter(counterName);
+    }
+
+    @Override
+    public Counter getCounter(String groupName, String counterName) {
+      return mapContext.getCounter(groupName, counterName);
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter() {
+      return mapContext.getOutputCommitter();
+    }
+
+    @Override
+    public void write(KEYOUT key, VALUEOUT value) throws IOException,
+        InterruptedException {
+      mapContext.write(key, value);
+    }
+
+    @Override
+    public String getStatus() {
+      return mapContext.getStatus();
+    }
+
+    @Override
+    public TaskAttemptID getTaskAttemptID() {
+      return mapContext.getTaskAttemptID();
+    }
+
+    @Override
+    public void setStatus(String msg) {
+      mapContext.setStatus(msg);
+    }
+
+    @Override
+    public Path[] getArchiveClassPaths() {
+      return mapContext.getArchiveClassPaths();
+    }
+
+    @Override
+    public String[] getArchiveTimestamps() {
+      return mapContext.getArchiveTimestamps();
+    }
+
+    @Override
+    public URI[] getCacheArchives() throws IOException {
+      return mapContext.getCacheArchives();
+    }
+
+    @Override
+    public URI[] getCacheFiles() throws IOException {
+      return mapContext.getCacheArchives();
+    }
+
+    @Override
+    public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass()
+        throws ClassNotFoundException {
+      return mapContext.getCombinerClass();
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+      return mapContext.getConfiguration();
+    }
+
+    @Override
+    public Path[] getFileClassPaths() {
+      return mapContext.getFileClassPaths();
+    }
+
+    @Override
+    public String[] getFileTimestamps() {
+      return mapContext.getFileTimestamps();
+    }
+
+    @Override
+    public RawComparator<?> getGroupingComparator() {
+      return mapContext.getGroupingComparator();
+    }
+
+    @Override
+    public Class<? extends InputFormat<?, ?>> getInputFormatClass()
+        throws ClassNotFoundException {
+      return mapContext.getInputFormatClass();
+    }
+
+    @Override
+    public String getJar() {
+      return mapContext.getJar();
+    }
+
+    @Override
+    public JobID getJobID() {
+      return mapContext.getJobID();
+    }
+
+    @Override
+    public String getJobName() {
+      return mapContext.getJobName();
+    }
+
+    @Override
+    public boolean getJobSetupCleanupNeeded() {
+      return mapContext.getJobSetupCleanupNeeded();
+    }
+
+    @Override
+    public Path[] getLocalCacheArchives() throws IOException {
+      return mapContext.getLocalCacheArchives();
+    }
+
+    @Override
+    public Path[] getLocalCacheFiles() throws IOException {
+      return mapContext.getLocalCacheFiles();
+    }
+
+    @Override
+    public Class<?> getMapOutputKeyClass() {
+      return mapContext.getMapOutputKeyClass();
+    }
+
+    @Override
+    public Class<?> getMapOutputValueClass() {
+      return mapContext.getMapOutputValueClass();
+    }
+
+    @Override
+    public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass()
+        throws ClassNotFoundException {
+      return mapContext.getMapperClass();
+    }
+
+    @Override
+    public int getMaxMapAttempts() {
+      return mapContext.getMaxMapAttempts();
+    }
+
+    @Override
+    public int getMaxReduceAttempts() {
+      return mapContext.getMaxReduceAttempts();
+    }
+
+    @Override
+    public int getNumReduceTasks() {
+      return mapContext.getNumReduceTasks();
+    }
+
+    @Override
+    public Class<? extends OutputFormat<?, ?>> getOutputFormatClass()
+        throws ClassNotFoundException {
+      return mapContext.getOutputFormatClass();
+    }
+
+    @Override
+    public Class<?> getOutputKeyClass() {
+      return mapContext.getOutputKeyClass();
+    }
+
+    @Override
+    public Class<?> getOutputValueClass() {
+      return mapContext.getOutputValueClass();
+    }
+
+    @Override
+    public Class<? extends Partitioner<?, ?>> getPartitionerClass()
+        throws ClassNotFoundException {
+      return mapContext.getPartitionerClass();
+    }
+
+    @Override
+    public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass()
+        throws ClassNotFoundException {
+      return mapContext.getReducerClass();
+    }
+
+    @Override
+    public RawComparator<?> getSortComparator() {
+      return mapContext.getSortComparator();
+    }
+
+    @Override
+    public boolean getSymlink() {
+      return mapContext.getSymlink();
+    }
+
+    @Override
+    public Path getWorkingDirectory() throws IOException {
+      return mapContext.getWorkingDirectory();
+    }
+
+    @Override
+    public void progress() {
+      mapContext.progress();
+    }
+
+    @Override
+    public boolean getProfileEnabled() {
+      return mapContext.getProfileEnabled();
+    }
+
+    @Override
+    public String getProfileParams() {
+      return mapContext.getProfileParams();
+    }
+
+    @Override
+    public IntegerRanges getProfileTaskRange(boolean isMap) {
+      return mapContext.getProfileTaskRange(isMap);
+    }
+
+    @Override
+    public String getUser() {
+      return mapContext.getUser();
+    }
+  }
+}

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java Fri Sep 18 23:52:56 2009
@@ -21,6 +21,7 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import java.io.IOException;
@@ -374,8 +375,9 @@
   public void write(KEYOUT key, VALUEOUT value, String baseOutputPath) 
       throws IOException, InterruptedException {
     checkBaseOutputPath(baseOutputPath);
-    TaskAttemptContext taskContext = new TaskAttemptContext(
-      context.getConfiguration(), context.getTaskAttemptID());
+    TaskAttemptContext taskContext = 
+      new TaskAttemptContextImpl(context.getConfiguration(), 
+                                 context.getTaskAttemptID());
     getRecordWriter(taskContext, baseOutputPath).write(key, value);
   }
 
@@ -422,8 +424,9 @@
     job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput));
     job.setOutputKeyClass(getNamedOutputKeyClass(context, nameOutput));
     job.setOutputValueClass(getNamedOutputValueClass(context, nameOutput));
-    TaskAttemptContext taskContext = new TaskAttemptContext(
-      job.getConfiguration(), context.getTaskAttemptID());
+    TaskAttemptContext taskContext = 
+      new TaskAttemptContextImpl(job.getConfiguration(), 
+                                 context.getTaskAttemptID());
     return taskContext;
   }
   

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java Fri Sep 18 23:52:56 2009
@@ -42,6 +42,7 @@
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -131,7 +132,8 @@
       for (int i = 0; i < splitsToSample; ++i) {
         RecordReader<K,V> reader = inf.createRecordReader(
           splits.get(i * splitStep), 
-          new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()));
+          new TaskAttemptContextImpl(job.getConfiguration(), 
+                                     new TaskAttemptID()));
         while (reader.nextKeyValue()) {
           samples.add(reader.getCurrentKey());
           ++records;
@@ -209,7 +211,8 @@
       for (int i = 0; i < splitsToSample ||
                      (i < splits.size() && samples.size() < numSamples); ++i) {
         RecordReader<K,V> reader = inf.createRecordReader(splits.get(i), 
-          new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()));
+          new TaskAttemptContextImpl(job.getConfiguration(), 
+                                     new TaskAttemptID()));
         while (reader.nextKeyValue()) {
           if (r.nextDouble() <= freq) {
             if (samples.size() < numSamples) {
@@ -277,7 +280,8 @@
       for (int i = 0; i < splitsToSample; ++i) {
         RecordReader<K,V> reader = inf.createRecordReader(
           splits.get(i * splitStep),
-          new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()));
+          new TaskAttemptContextImpl(job.getConfiguration(), 
+                                     new TaskAttemptID()));
         while (reader.nextKeyValue()) {
           ++records;
           if ((double) kept / records < freq) {

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java?rev=816816&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java Fri Sep 18 23:52:56 2009
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.reduce;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.ReduceContext;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+/**
+ * A {@link Reducer} which wraps a given one to allow for custom 
+ * {@link Reducer.Context} implementations.
+ */
+public class WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
+    extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+
+  /**
+   * A a wrapped {@link Reducer.Context} for custom implementations.
+   * @param reduceContext <code>ReduceContext</code> to be wrapped
+   * @return a wrapped <code>Reducer.Context</code> for custom implementations
+   */
+  public Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context 
+  getReducerContext(ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext) {
+    return new Context(reduceContext);
+  }
+  
+  public class Context 
+      extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context {
+
+    protected ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext;
+
+    public Context(ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext)
+    {
+      this.reduceContext = reduceContext; 
+    }
+
+    @Override
+    public KEYIN getCurrentKey() throws IOException, InterruptedException {
+      return reduceContext.getCurrentKey();
+    }
+
+    @Override
+    public VALUEIN getCurrentValue() throws IOException, InterruptedException {
+      return reduceContext.getCurrentValue();
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      return reduceContext.nextKeyValue();
+    }
+
+    @Override
+    public Counter getCounter(Enum counterName) {
+      return reduceContext.getCounter(counterName);
+    }
+
+    @Override
+    public Counter getCounter(String groupName, String counterName) {
+      return reduceContext.getCounter(groupName, counterName);
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter() {
+      return reduceContext.getOutputCommitter();
+    }
+
+    @Override
+    public void write(KEYOUT key, VALUEOUT value) throws IOException,
+        InterruptedException {
+      reduceContext.write(key, value);
+    }
+
+    @Override
+    public String getStatus() {
+      return reduceContext.getStatus();
+    }
+
+    @Override
+    public TaskAttemptID getTaskAttemptID() {
+      return reduceContext.getTaskAttemptID();
+    }
+
+    @Override
+    public void setStatus(String msg) {
+      reduceContext.setStatus(msg);
+    }
+
+    @Override
+    public Path[] getArchiveClassPaths() {
+      return reduceContext.getArchiveClassPaths();
+    }
+
+    @Override
+    public String[] getArchiveTimestamps() {
+      return reduceContext.getArchiveTimestamps();
+    }
+
+    @Override
+    public URI[] getCacheArchives() throws IOException {
+      return reduceContext.getCacheArchives();
+    }
+
+    @Override
+    public URI[] getCacheFiles() throws IOException {
+      return reduceContext.getCacheArchives();
+    }
+
+    @Override
+    public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass()
+        throws ClassNotFoundException {
+      return reduceContext.getCombinerClass();
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+      return reduceContext.getConfiguration();
+    }
+
+    @Override
+    public Path[] getFileClassPaths() {
+      return reduceContext.getFileClassPaths();
+    }
+
+    @Override
+    public String[] getFileTimestamps() {
+      return reduceContext.getFileTimestamps();
+    }
+
+    @Override
+    public RawComparator<?> getGroupingComparator() {
+      return reduceContext.getGroupingComparator();
+    }
+
+    @Override
+    public Class<? extends InputFormat<?, ?>> getInputFormatClass()
+        throws ClassNotFoundException {
+      return reduceContext.getInputFormatClass();
+    }
+
+    @Override
+    public String getJar() {
+      return reduceContext.getJar();
+    }
+
+    @Override
+    public JobID getJobID() {
+      return reduceContext.getJobID();
+    }
+
+    @Override
+    public String getJobName() {
+      return reduceContext.getJobName();
+    }
+
+    @Override
+    public boolean getJobSetupCleanupNeeded() {
+      return reduceContext.getJobSetupCleanupNeeded();
+    }
+
+    @Override
+    public Path[] getLocalCacheArchives() throws IOException {
+      return reduceContext.getLocalCacheArchives();
+    }
+
+    @Override
+    public Path[] getLocalCacheFiles() throws IOException {
+      return reduceContext.getLocalCacheFiles();
+    }
+
+    @Override
+    public Class<?> getMapOutputKeyClass() {
+      return reduceContext.getMapOutputKeyClass();
+    }
+
+    @Override
+    public Class<?> getMapOutputValueClass() {
+      return reduceContext.getMapOutputValueClass();
+    }
+
+    @Override
+    public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass()
+        throws ClassNotFoundException {
+      return reduceContext.getMapperClass();
+    }
+
+    @Override
+    public int getMaxMapAttempts() {
+      return reduceContext.getMaxMapAttempts();
+    }
+
+    @Override
+    public int getMaxReduceAttempts() {
+      return reduceContext.getMaxReduceAttempts();
+    }
+
+    @Override
+    public int getNumReduceTasks() {
+      return reduceContext.getNumReduceTasks();
+    }
+
+    @Override
+    public Class<? extends OutputFormat<?, ?>> getOutputFormatClass()
+        throws ClassNotFoundException {
+      return reduceContext.getOutputFormatClass();
+    }
+
+    @Override
+    public Class<?> getOutputKeyClass() {
+      return reduceContext.getOutputKeyClass();
+    }
+
+    @Override
+    public Class<?> getOutputValueClass() {
+      return reduceContext.getOutputValueClass();
+    }
+
+    @Override
+    public Class<? extends Partitioner<?, ?>> getPartitionerClass()
+        throws ClassNotFoundException {
+      return reduceContext.getPartitionerClass();
+    }
+
+    @Override
+    public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass()
+        throws ClassNotFoundException {
+      return reduceContext.getReducerClass();
+    }
+
+    @Override
+    public RawComparator<?> getSortComparator() {
+      return reduceContext.getSortComparator();
+    }
+
+    @Override
+    public boolean getSymlink() {
+      return reduceContext.getSymlink();
+    }
+
+    @Override
+    public Path getWorkingDirectory() throws IOException {
+      return reduceContext.getWorkingDirectory();
+    }
+
+    @Override
+    public void progress() {
+      reduceContext.progress();
+    }
+
+    @Override
+    public Iterable<VALUEIN> getValues() throws IOException,
+        InterruptedException {
+      return reduceContext.getValues();
+    }
+
+    @Override
+    public boolean nextKey() throws IOException, InterruptedException {
+      return reduceContext.nextKey();
+    }
+    
+    @Override
+    public boolean getProfileEnabled() {
+      return reduceContext.getProfileEnabled();
+    }
+
+    @Override
+    public String getProfileParams() {
+      return reduceContext.getProfileParams();
+    }
+
+    @Override
+    public IntegerRanges getProfileTaskRange(boolean isMap) {
+      return reduceContext.getProfileTaskRange(isMap);
+    }
+
+    @Override
+    public String getUser() {
+      return reduceContext.getUser();
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java?rev=816816&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java Fri Sep 18 23:52:56 2009
@@ -0,0 +1,387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.task;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
+
+/**
+ * A read-only view of the job that is provided to the tasks while they
+ * are running.
+ */
+public class JobContextImpl implements JobContext {
+
+  protected final org.apache.hadoop.mapred.JobConf conf;
+  private final JobID jobId;
+  
+  public JobContextImpl(Configuration conf, JobID jobId) {
+    this.conf = new org.apache.hadoop.mapred.JobConf(conf);
+    this.jobId = jobId;
+  }
+
+  /**
+   * Return the configuration for the job.
+   * @return the shared configuration object
+   */
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  /**
+   * Get the unique ID for the job.
+   * @return the object with the job id
+   */
+  public JobID getJobID() {
+    return jobId;
+  }
+  
+  /**
+   * Get configured the number of reduce tasks for this job. Defaults to 
+   * <code>1</code>.
+   * @return the number of reduce tasks for this job.
+   */
+  public int getNumReduceTasks() {
+    return conf.getNumReduceTasks();
+  }
+  
+  /**
+   * Get the current working directory for the default file system.
+   * 
+   * @return the directory name.
+   */
+  public Path getWorkingDirectory() throws IOException {
+    return conf.getWorkingDirectory();
+  }
+
+  /**
+   * Get the key class for the job output data.
+   * @return the key class for the job output data.
+   */
+  public Class<?> getOutputKeyClass() {
+    return conf.getOutputKeyClass();
+  }
+  
+  /**
+   * Get the value class for job outputs.
+   * @return the value class for job outputs.
+   */
+  public Class<?> getOutputValueClass() {
+    return conf.getOutputValueClass();
+  }
+
+  /**
+   * Get the key class for the map output data. If it is not set, use the
+   * (final) output key class. This allows the map output key class to be
+   * different than the final output key class.
+   * @return the map output key class.
+   */
+  public Class<?> getMapOutputKeyClass() {
+    return conf.getMapOutputKeyClass();
+  }
+
+  /**
+   * Get the value class for the map output data. If it is not set, use the
+   * (final) output value class This allows the map output value class to be
+   * different than the final output value class.
+   *  
+   * @return the map output value class.
+   */
+  public Class<?> getMapOutputValueClass() {
+    return conf.getMapOutputValueClass();
+  }
+
+  /**
+   * Get the user-specified job name. This is only used to identify the 
+   * job to the user.
+   * 
+   * @return the job's name, defaulting to "".
+   */
+  public String getJobName() {
+    return conf.getJobName();
+  }
+
+  /**
+   * Get the {@link InputFormat} class for the job.
+   * 
+   * @return the {@link InputFormat} class for the job.
+   */
+  @SuppressWarnings("unchecked")
+  public Class<? extends InputFormat<?,?>> getInputFormatClass() 
+     throws ClassNotFoundException {
+    return (Class<? extends InputFormat<?,?>>) 
+      conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
+  }
+
+  /**
+   * Get the {@link Mapper} class for the job.
+   * 
+   * @return the {@link Mapper} class for the job.
+   */
+  @SuppressWarnings("unchecked")
+  public Class<? extends Mapper<?,?,?,?>> getMapperClass() 
+     throws ClassNotFoundException {
+    return (Class<? extends Mapper<?,?,?,?>>) 
+      conf.getClass(MAP_CLASS_ATTR, Mapper.class);
+  }
+
+  /**
+   * Get the combiner class for the job.
+   * 
+   * @return the combiner class for the job.
+   */
+  @SuppressWarnings("unchecked")
+  public Class<? extends Reducer<?,?,?,?>> getCombinerClass() 
+     throws ClassNotFoundException {
+    return (Class<? extends Reducer<?,?,?,?>>) 
+      conf.getClass(COMBINE_CLASS_ATTR, null);
+  }
+
+  /**
+   * Get the {@link Reducer} class for the job.
+   * 
+   * @return the {@link Reducer} class for the job.
+   */
+  @SuppressWarnings("unchecked")
+  public Class<? extends Reducer<?,?,?,?>> getReducerClass() 
+     throws ClassNotFoundException {
+    return (Class<? extends Reducer<?,?,?,?>>) 
+      conf.getClass(REDUCE_CLASS_ATTR, Reducer.class);
+  }
+
+  /**
+   * Get the {@link OutputFormat} class for the job.
+   * 
+   * @return the {@link OutputFormat} class for the job.
+   */
+  @SuppressWarnings("unchecked")
+  public Class<? extends OutputFormat<?,?>> getOutputFormatClass() 
+     throws ClassNotFoundException {
+    return (Class<? extends OutputFormat<?,?>>) 
+      conf.getClass(OUTPUT_FORMAT_CLASS_ATTR, TextOutputFormat.class);
+  }
+
+  /**
+   * Get the {@link Partitioner} class for the job.
+   * 
+   * @return the {@link Partitioner} class for the job.
+   */
+  @SuppressWarnings("unchecked")
+  public Class<? extends Partitioner<?,?>> getPartitionerClass() 
+     throws ClassNotFoundException {
+    return (Class<? extends Partitioner<?,?>>) 
+      conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
+  }
+
+  /**
+   * Get the {@link RawComparator} comparator used to compare keys.
+   * 
+   * @return the {@link RawComparator} comparator used to compare keys.
+   */
+  public RawComparator<?> getSortComparator() {
+    return conf.getOutputKeyComparator();
+  }
+
+  /**
+   * Get the pathname of the job's jar.
+   * @return the pathname
+   */
+  public String getJar() {
+    return conf.getJar();
+  }
+
+  /** 
+   * Get the user defined {@link RawComparator} comparator for 
+   * grouping keys of inputs to the reduce.
+   * 
+   * @return comparator set by the user for grouping values.
+   * @see Job#setGroupingComparatorClass(Class) for details.  
+   */
+  public RawComparator<?> getGroupingComparator() {
+    return conf.getOutputValueGroupingComparator();
+  }
+  
+  /**
+   * Get whether job-setup and job-cleanup is needed for the job 
+   * 
+   * @return boolean 
+   */
+  public boolean getJobSetupCleanupNeeded() {
+    return conf.getBoolean("mapred.committer.job.setup.cleanup.needed", true);
+  }
+
+  /**
+   * This method checks to see if symlinks are to be create for the 
+   * localized cache files in the current working directory 
+   * @return true if symlinks are to be created- else return false
+   */
+  public boolean getSymlink() {
+    return DistributedCache.getSymlink(conf);
+  }
+  
+  /**
+   * Get the archive entries in classpath as an array of Path
+   */
+  public Path[] getArchiveClassPaths() {
+    return DistributedCache.getArchiveClassPaths(conf);
+  }
+
+  /**
+   * Get cache archives set in the Configuration
+   * @return A URI array of the caches set in the Configuration
+   * @throws IOException
+   */
+  public URI[] getCacheArchives() throws IOException {
+    return DistributedCache.getCacheArchives(conf);
+  }
+
+  /**
+   * Get cache files set in the Configuration
+   * @return A URI array of the files set in the Configuration
+   * @throws IOException
+   */
+
+  public URI[] getCacheFiles() throws IOException {
+    return DistributedCache.getCacheFiles(conf);
+  }
+
+  /**
+   * Return the path array of the localized caches
+   * @return A path array of localized caches
+   * @throws IOException
+   */
+  public Path[] getLocalCacheArchives()
+    throws IOException {
+    return DistributedCache.getLocalCacheArchives(conf);
+  }
+
+  /**
+   * Return the path array of the localized files
+   * @return A path array of localized files
+   * @throws IOException
+   */
+  public Path[] getLocalCacheFiles()
+    throws IOException {
+    return DistributedCache.getLocalCacheFiles(conf);
+  }
+
+  /**
+   * Get the file entries in classpath as an array of Path
+   */
+  public Path[] getFileClassPaths() {
+    return DistributedCache.getFileClassPaths(conf);
+  }
+  
+  /**
+   * Get the timestamps of the archives.  Used by internal
+   * DistributedCache and MapReduce code.
+   * @return a string array of timestamps 
+   * @throws IOException
+   */
+  public String[] getArchiveTimestamps() {
+    return DistributedCache.getArchiveTimestamps(conf);
+  }
+
+  /**
+   * Get the timestamps of the files.  Used by internal
+   * DistributedCache and MapReduce code.
+   * @return a string array of timestamps 
+   * @throws IOException
+   */
+  public String[] getFileTimestamps() {
+    return DistributedCache.getFileTimestamps(conf);
+  }
+
+  /** 
+   * Get the configured number of maximum attempts that will be made to run a
+   * map task, as specified by the <code>mapred.map.max.attempts</code>
+   * property. If this property is not already set, the default is 4 attempts.
+   *  
+   * @return the max number of attempts per map task.
+   */
+  public int getMaxMapAttempts() {
+    return conf.getMaxMapAttempts();
+  }
+
+  /** 
+   * Get the configured number of maximum attempts  that will be made to run a
+   * reduce task, as specified by the <code>mapred.reduce.max.attempts</code>
+   * property. If this property is not already set, the default is 4 attempts.
+   * 
+   * @return the max number of attempts per reduce task.
+   */
+  public int getMaxReduceAttempts() {
+    return conf.getMaxReduceAttempts();
+  }
+
+  /**
+   * Get whether the task profiling is enabled.
+   * @return true if some tasks will be profiled
+   */
+  public boolean getProfileEnabled() {
+    return conf.getProfileEnabled();
+  }
+
+  /**
+   * Get the profiler configuration arguments.
+   *
+   * The default value for this property is
+   * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s"
+   * 
+   * @return the parameters to pass to the task child to configure profiling
+   */
+  public String getProfileParams() {
+    return conf.getProfileParams();
+  }
+
+  /**
+   * Get the range of maps or reduces to profile.
+   * @param isMap is the task a map?
+   * @return the task ranges
+   */
+  public IntegerRanges getProfileTaskRange(boolean isMap) {
+    return conf.getProfileTaskRange(isMap);
+  }
+
+  /**
+   * Get the reported username for this job.
+   * 
+   * @return the username
+   */
+  public String getUser() {
+    return conf.getUser();
+  }
+  
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/MapContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/MapContextImpl.java?rev=816816&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/MapContextImpl.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/MapContextImpl.java Fri Sep 18 23:52:56 2009
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.task;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+/**
+ * The context that is given to the {@link Mapper}.
+ * @param <KEYIN> the key input type to the Mapper
+ * @param <VALUEIN> the value input type to the Mapper
+ * @param <KEYOUT> the key output type from the Mapper
+ * @param <VALUEOUT> the value output type from the Mapper
+ */
+public class MapContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
+    extends TaskInputOutputContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
+    implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+  private RecordReader<KEYIN,VALUEIN> reader;
+  private InputSplit split;
+
+  public MapContextImpl(Configuration conf, TaskAttemptID taskid,
+                        RecordReader<KEYIN,VALUEIN> reader,
+                        RecordWriter<KEYOUT,VALUEOUT> writer,
+                        OutputCommitter committer,
+                        StatusReporter reporter,
+                        InputSplit split) {
+    super(conf, taskid, writer, committer, reporter);
+    this.reader = reader;
+    this.split = split;
+  }
+
+  /**
+   * Get the input split for this map.
+   */
+  public InputSplit getInputSplit() {
+    return split;
+  }
+
+  @Override
+  public KEYIN getCurrentKey() throws IOException, InterruptedException {
+    return reader.getCurrentKey();
+  }
+
+  @Override
+  public VALUEIN getCurrentValue() throws IOException, InterruptedException {
+    return reader.getCurrentValue();
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    return reader.nextKeyValue();
+  }
+
+}
+     
\ No newline at end of file

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/ReduceContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/ReduceContextImpl.java?rev=816816&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/ReduceContextImpl.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/ReduceContextImpl.java Fri Sep 18 23:52:56 2009
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.task;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapred.BackupStore;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.ReduceContext;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * The context passed to the {@link Reducer}.
+ * @param <KEYIN> the class of the input keys
+ * @param <VALUEIN> the class of the input values
+ * @param <KEYOUT> the class of the output keys
+ * @param <VALUEOUT> the class of the output values
+ */
+public class ReduceContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
+    extends TaskInputOutputContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
+    implements ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+  private RawKeyValueIterator input;
+  private Counter inputValueCounter;
+  private Counter inputKeyCounter;
+  private RawComparator<KEYIN> comparator;
+  private KEYIN key;                                  // current key
+  private VALUEIN value;                              // current value
+  private boolean firstValue = false;                 // first value in key
+  private boolean nextKeyIsSame = false;              // more w/ this key
+  private boolean hasMore;                            // more in file
+  protected Progressable reporter;
+  private Deserializer<KEYIN> keyDeserializer;
+  private Deserializer<VALUEIN> valueDeserializer;
+  private DataInputBuffer buffer = new DataInputBuffer();
+  private BytesWritable currentRawKey = new BytesWritable();
+  private ValueIterable iterable = new ValueIterable();
+  private boolean isMarked = false;
+  private BackupStore<KEYIN,VALUEIN> backupStore;
+  private final SerializationFactory serializationFactory;
+  private final Class<KEYIN> keyClass;
+  private final Class<VALUEIN> valueClass;
+  private final Configuration conf;
+  private final TaskAttemptID taskid;
+  private int currentKeyLength = -1;
+  private int currentValueLength = -1;
+  
+  public ReduceContextImpl(Configuration conf, TaskAttemptID taskid,
+                           RawKeyValueIterator input, 
+                           Counter inputKeyCounter,
+                           Counter inputValueCounter,
+                           RecordWriter<KEYOUT,VALUEOUT> output,
+                           OutputCommitter committer,
+                           StatusReporter reporter,
+                           RawComparator<KEYIN> comparator,
+                           Class<KEYIN> keyClass,
+                           Class<VALUEIN> valueClass
+                          ) throws InterruptedException, IOException{
+    super(conf, taskid, output, committer, reporter);
+    this.input = input;
+    this.inputKeyCounter = inputKeyCounter;
+    this.inputValueCounter = inputValueCounter;
+    this.comparator = comparator;
+    this.serializationFactory = new SerializationFactory(conf);
+    this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
+    this.keyDeserializer.open(buffer);
+    this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
+    this.valueDeserializer.open(buffer);
+    hasMore = input.next();
+    this.keyClass = keyClass;
+    this.valueClass = valueClass;
+    this.conf = conf;
+    this.taskid = taskid;
+  }
+
+  /** Start processing next unique key. */
+  public boolean nextKey() throws IOException,InterruptedException {
+    while (hasMore && nextKeyIsSame) {
+      nextKeyValue();
+    }
+    if (hasMore) {
+      if (inputKeyCounter != null) {
+        inputKeyCounter.increment(1);
+      }
+      return nextKeyValue();
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Advance to the next key/value pair.
+   */
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    if (!hasMore) {
+      key = null;
+      value = null;
+      return false;
+    }
+    firstValue = !nextKeyIsSame;
+    DataInputBuffer nextKey = input.getKey();
+    currentRawKey.set(nextKey.getData(), nextKey.getPosition(), 
+                      nextKey.getLength() - nextKey.getPosition());
+    buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
+    key = keyDeserializer.deserialize(key);
+    DataInputBuffer nextVal = input.getValue();
+    buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength());
+    value = valueDeserializer.deserialize(value);
+
+    currentKeyLength = nextKey.getLength() - nextKey.getPosition();
+    currentValueLength = nextVal.getLength() - nextVal.getPosition();
+
+    if (isMarked) {
+      backupStore.write(nextKey, nextVal);
+    }
+
+    hasMore = input.next();
+    if (hasMore) {
+      nextKey = input.getKey();
+      nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 
+                                     currentRawKey.getLength(),
+                                     nextKey.getData(),
+                                     nextKey.getPosition(),
+                                     nextKey.getLength() - nextKey.getPosition()
+                                         ) == 0;
+    } else {
+      nextKeyIsSame = false;
+    }
+    inputValueCounter.increment(1);
+    return true;
+  }
+
+  public KEYIN getCurrentKey() {
+    return key;
+  }
+
+  @Override
+  public VALUEIN getCurrentValue() {
+    return value;
+  }
+  
+  protected class ValueIterator implements ReduceContext.ValueIterator<VALUEIN> {
+
+    private boolean inReset = false;
+    private boolean clearMarkFlag = false;
+    
+    @Override
+    public boolean hasNext() {
+      try {
+        if (inReset && backupStore.hasNext()) {
+          return true;
+        } 
+      } catch (Exception e) {
+        e.printStackTrace();
+        throw new RuntimeException("hasNext failed", e);
+      }
+      return firstValue || nextKeyIsSame;
+    }
+
+    @Override
+    public VALUEIN next() {
+      if (inReset) {
+        try {
+          if (backupStore.hasNext()) {
+            backupStore.next();
+            DataInputBuffer next = backupStore.nextValue();
+            buffer.reset(next.getData(), next.getPosition(), next.getLength());
+            value = valueDeserializer.deserialize(value);
+            return value;
+          } else {
+            inReset = false;
+            backupStore.exitResetMode();
+            if (clearMarkFlag) {
+              clearMarkFlag = false;
+              isMarked = false;
+            }
+          }
+        } catch (IOException e) {
+          e.printStackTrace();
+          throw new RuntimeException("next value iterator failed", e);
+        }
+      } 
+
+      // if this is the first record, we don't need to advance
+      if (firstValue) {
+        firstValue = false;
+        return value;
+      }
+      // if this isn't the first record and the next key is different, they
+      // can't advance it here.
+      if (!nextKeyIsSame) {
+        throw new NoSuchElementException("iterate past last value");
+      }
+      // otherwise, go to the next key/value pair
+      try {
+        nextKeyValue();
+        return value;
+      } catch (IOException ie) {
+        throw new RuntimeException("next value iterator failed", ie);
+      } catch (InterruptedException ie) {
+        // this is bad, but we can't modify the exception list of java.util
+        throw new RuntimeException("next value iterator interrupted", ie);        
+      }
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("remove not implemented");
+    }
+
+    @Override
+    public void mark() throws IOException {
+      if (backupStore == null) {
+        backupStore = new BackupStore<KEYIN,VALUEIN>(conf, taskid);
+      }
+      isMarked = true;
+      if (!inReset) {
+        backupStore.reinitialize();
+        if (currentKeyLength == -1) {
+          // The user has not called next() for this iterator yet, so
+          // there is no current record to mark and copy to backup store.
+          return;
+        }
+        assert (currentValueLength != -1);
+        int requestedSize = currentKeyLength + currentValueLength + 
+          WritableUtils.getVIntSize(currentKeyLength) +
+          WritableUtils.getVIntSize(currentValueLength);
+        DataOutputStream out = backupStore.getOutputStream(requestedSize);
+        writeFirstKeyValueBytes(out);
+        backupStore.updateCounters(requestedSize);
+      } else {
+        backupStore.mark();
+      }
+    }
+
+    @Override
+    public void reset() throws IOException {
+      // We reached the end of an iteration and user calls a 
+      // reset, but a clearMark was called before, just throw
+      // an exception
+      if (clearMarkFlag) {
+        clearMarkFlag = false;
+        backupStore.clearMark();
+        throw new IOException("Reset called without a previous mark");
+      }
+      
+      if (!isMarked) {
+        throw new IOException("Reset called without a previous mark");
+      }
+      inReset = true;
+      backupStore.reset();
+    }
+
+    @Override
+    public void clearMark() throws IOException {
+      if (backupStore == null) {
+        return;
+      }
+      if (inReset) {
+        clearMarkFlag = true;
+        backupStore.clearMark();
+      } else {
+        inReset = isMarked = false;
+        backupStore.reinitialize();
+      }
+    }
+    
+    /**
+     * This method is called when the reducer moves from one key to 
+     * another.
+     * @throws IOException
+     */
+    public void resetBackupStore() throws IOException {
+      if (backupStore == null) {
+        return;
+      }
+      inReset = isMarked = false;
+      backupStore.reinitialize();
+      currentKeyLength = -1;
+    }
+
+    /**
+     * This method is called to write the record that was most recently
+     * served (before a call to the mark). Since the framework reads one
+     * record in advance, to get this record, we serialize the current key
+     * and value
+     * @param out
+     * @throws IOException
+     */
+    private void writeFirstKeyValueBytes(DataOutputStream out) 
+    throws IOException {
+      assert (getCurrentKey() != null && getCurrentValue() != null);
+      WritableUtils.writeVInt(out, currentKeyLength);
+      WritableUtils.writeVInt(out, currentValueLength);
+      Serializer<KEYIN> keySerializer = 
+        serializationFactory.getSerializer(keyClass);
+      keySerializer.open(out);
+      keySerializer.serialize(getCurrentKey());
+
+      Serializer<VALUEIN> valueSerializer = 
+        serializationFactory.getSerializer(valueClass);
+      valueSerializer.open(out);
+      valueSerializer.serialize(getCurrentValue());
+    }
+  }
+
+  protected class ValueIterable implements Iterable<VALUEIN> {
+    private ValueIterator iterator = new ValueIterator();
+    @Override
+    public Iterator<VALUEIN> iterator() {
+      return iterator;
+    } 
+  }
+  
+  /**
+   * Iterate through the values for the current key, reusing the same value 
+   * object, which is stored in the context.
+   * @return the series of values associated with the current key. All of the 
+   * objects returned directly and indirectly from this method are reused.
+   */
+  public 
+  Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
+    return iterable;
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java?rev=816816&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java Fri Sep 18 23:52:56 2009
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.task;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+/**
+ * The context for task attempts.
+ */
+public class TaskAttemptContextImpl extends JobContextImpl 
+    implements TaskAttemptContext {
+  private final TaskAttemptID taskId;
+  private String status = "";
+  
+  public TaskAttemptContextImpl(Configuration conf, 
+                                TaskAttemptID taskId) {
+    super(conf, taskId.getJobID());
+    this.taskId = taskId;
+  }
+
+  /**
+   * Get the unique name for this task attempt.
+   */
+  public TaskAttemptID getTaskAttemptID() {
+    return taskId;
+  }
+
+  /**
+   * Set the current status of the task to the given string.
+   */
+  public void setStatus(String msg) {
+    status = msg;
+  }
+
+  /**
+   * Get the last set status message.
+   * @return the current status message
+   */
+  public String getStatus() {
+    return status;
+  }
+
+  /**
+   * Report progress. The subtypes actually do work in this method.
+   */
+  @Override
+  public void progress() { 
+  }
+    
+}
\ No newline at end of file

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/TaskInputOutputContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/TaskInputOutputContextImpl.java?rev=816816&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/TaskInputOutputContextImpl.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/TaskInputOutputContextImpl.java Fri Sep 18 23:52:56 2009
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.task;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * A context object that allows input and output from the task. It is only
+ * supplied to the {@link Mapper} or {@link Reducer}.
+ * @param <KEYIN> the input key type for the task
+ * @param <VALUEIN> the input value type for the task
+ * @param <KEYOUT> the output key type for the task
+ * @param <VALUEOUT> the output value type for the task
+ */
+public abstract class TaskInputOutputContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
+       extends TaskAttemptContextImpl 
+       implements TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+  private RecordWriter<KEYOUT,VALUEOUT> output;
+  private StatusReporter reporter;
+  private OutputCommitter committer;
+
+  public TaskInputOutputContextImpl(Configuration conf, TaskAttemptID taskid,
+                                    RecordWriter<KEYOUT,VALUEOUT> output,
+                                    OutputCommitter committer,
+                                    StatusReporter reporter) {
+    super(conf, taskid);
+    this.output = output;
+    this.reporter = reporter;
+    this.committer = committer;
+  }
+
+  /**
+   * Advance to the next key, value pair, returning null if at end.
+   * @return the key object that was read into, or null if no more
+   */
+  public abstract 
+  boolean nextKeyValue() throws IOException, InterruptedException;
+ 
+  /**
+   * Get the current key.
+   * @return the current key object or null if there isn't one
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract 
+  KEYIN getCurrentKey() throws IOException, InterruptedException;
+
+  /**
+   * Get the current value.
+   * @return the value object that was read into
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract VALUEIN getCurrentValue() throws IOException, 
+                                                   InterruptedException;
+
+  /**
+   * Generate an output key/value pair.
+   */
+  public void write(KEYOUT key, VALUEOUT value
+                    ) throws IOException, InterruptedException {
+    output.write(key, value);
+  }
+
+  public Counter getCounter(Enum<?> counterName) {
+    return reporter.getCounter(counterName);
+  }
+
+  public Counter getCounter(String groupName, String counterName) {
+    return reporter.getCounter(groupName, counterName);
+  }
+
+  public OutputCommitter getOutputCommitter() {
+    return committer;
+  }
+}

Modified: hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml Fri Sep 18 23:52:56 2009
@@ -49,6 +49,10 @@
        <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS" />
      </Match>
      <Match>
+       <Class name="~org.apache.hadoop.mapred.*" />
+       <Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE" />
+     </Match>
+     <Match>
        <Class name="~org.apache.hadoop.mapred.lib.aggregate.*" />
        <Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE" />
      </Match>

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java Fri Sep 18 23:52:56 2009
@@ -23,6 +23,8 @@
 
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
 
 public class TestFileOutputCommitter extends TestCase {
   private static Path outDir = new Path(
@@ -38,8 +40,8 @@
     job.set(JobContext.TASK_ATTEMPT_ID, attempt);
     job.setOutputCommitter(FileOutputCommitter.class);
     FileOutputFormat.setOutputPath(job, outDir);
-    JobContext jContext = new JobContext(job, taskID.getJobID());
-    TaskAttemptContext tContext = new TaskAttemptContext(job, taskID);
+    JobContext jContext = new JobContextImpl(job, taskID.getJobID());
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(job, taskID);
     FileOutputCommitter committer = new FileOutputCommitter();
     FileOutputFormat.setWorkOutputPath(job, 
       committer.getTempTaskOutputPath(tContext));

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java Fri Sep 18 23:52:56 2009
@@ -140,7 +140,7 @@
             TEST_ROOT_DIR);
     jobId = taskId.getJobID();
     
-    JobContext jContext = new JobContext(job, jobId);
+    JobContext jContext = new JobContextImpl(job, jobId);
     Job.RawSplit[] rawSplits = LocalJobRunner.getRawSplits(jContext, job);
 
     job.setUseNewMapper(true); // use new api

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java Fri Sep 18 23:52:56 2009
@@ -49,6 +49,7 @@
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /**
@@ -371,7 +372,7 @@
       Configuration conf) {
     TaskAttemptID tid = new TaskAttemptID("jt", 1, TaskType.MAP, 0, 0);
     conf.set(JobContext.TASK_ATTEMPT_ID, tid.toString());
-    return new TaskAttemptContext(conf, tid);    
+    return new TaskAttemptContextImpl(conf, tid);    
   }
 
   public static StatusReporter createDummyReporter() {

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java Fri Sep 18 23:52:56 2009
@@ -30,6 +30,7 @@
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 
 public class TestNoJobSetupCleanup extends HadoopTestCase {
   private static String TEST_ROOT_DIR =
@@ -91,7 +92,7 @@
       super.checkOutputSpecs(job);
       // creating dummy TaskAttemptID
       TaskAttemptID tid = new TaskAttemptID("jt", 1, TaskType.JOB_SETUP, 0, 0);
-      getOutputCommitter(new TaskAttemptContext(job.getConfiguration(), tid)).
+      getOutputCommitter(new TaskAttemptContextImpl(job.getConfiguration(), tid)).
         setupJob(job);
     }
   }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRKeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRKeyValueTextInputFormat.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRKeyValueTextInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRKeyValueTextInputFormat.java Fri Sep 18 23:52:56 2009
@@ -39,6 +39,7 @@
 import org.apache.hadoop.mapreduce.MapReduceTestUtil;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
 import org.apache.hadoop.util.LineReader;
 import org.apache.hadoop.util.ReflectionUtils;
 
@@ -118,7 +119,7 @@
           assertEquals("reader class is KeyValueLineRecordReader.", 
             KeyValueLineRecordReader.class, clazz);
           MapContext<Text, Text, Text, Text> mcontext = 
-            new MapContext<Text, Text, Text, Text>(job.getConfiguration(), 
+            new MapContextImpl<Text, Text, Text, Text>(job.getConfiguration(), 
             context.getTaskAttemptID(), reader, null, null, 
             MapReduceTestUtil.createDummyReporter(), splits.get(j));
           reader.initialize(splits.get(j), mcontext);
@@ -217,7 +218,7 @@
             splits.get(j), context);
           Class<?> clazz = reader.getClass();
           MapContext<Text, Text, Text, Text> mcontext =
-            new MapContext<Text, Text, Text, Text>(job.getConfiguration(),
+            new MapContextImpl<Text, Text, Text, Text>(job.getConfiguration(),
             context.getTaskAttemptID(), reader, null, null,
             MapReduceTestUtil.createDummyReporter(), splits.get(j));
           reader.initialize(splits.get(j), mcontext);
@@ -312,7 +313,7 @@
     RecordReader<Text, Text> reader = format.createRecordReader(split, 
       MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
     MapContext<Text, Text, Text, Text> mcontext = 
-      new MapContext<Text, Text, Text, Text>(conf, 
+      new MapContextImpl<Text, Text, Text, Text>(conf, 
       context.getTaskAttemptID(), reader, null, null,
       MapReduceTestUtil.createDummyReporter(), 
       split);

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsBinaryInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsBinaryInputFormat.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsBinaryInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsBinaryInputFormat.java Fri Sep 18 23:52:56 2009
@@ -30,6 +30,7 @@
 import org.apache.hadoop.mapreduce.MapReduceTestUtil;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
 
 import junit.framework.TestCase;
 
@@ -79,7 +80,7 @@
       RecordReader<BytesWritable, BytesWritable> reader =
             bformat.createRecordReader(split, context);
       MapContext<BytesWritable, BytesWritable, BytesWritable, BytesWritable> 
-        mcontext = new MapContext<BytesWritable, BytesWritable,
+        mcontext = new MapContextImpl<BytesWritable, BytesWritable,
           BytesWritable, BytesWritable>(job.getConfiguration(), 
           context.getTaskAttemptID(), reader, null, null, 
           MapReduceTestUtil.createDummyReporter(), 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsTextInputFormat.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsTextInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsTextInputFormat.java Fri Sep 18 23:52:56 2009
@@ -30,6 +30,7 @@
 import org.apache.hadoop.mapreduce.MapReduceTestUtil;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
 import org.apache.hadoop.conf.*;
 
 public class TestMRSequenceFileAsTextInputFormat extends TestCase {
@@ -84,7 +85,7 @@
           RecordReader<Text, Text> reader =
             format.createRecordReader(split, context);
           MapContext<Text, Text, Text, Text> mcontext = 
-            new MapContext<Text, Text, Text, Text>(job.getConfiguration(), 
+            new MapContextImpl<Text, Text, Text, Text>(job.getConfiguration(), 
             context.getTaskAttemptID(), reader, null, null, 
             MapReduceTestUtil.createDummyReporter(), 
             split);

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileInputFilter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileInputFilter.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileInputFilter.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileInputFilter.java Fri Sep 18 23:52:56 2009
@@ -33,6 +33,7 @@
 import org.apache.hadoop.mapreduce.MapReduceTestUtil;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
 import org.apache.hadoop.conf.*;
 
 public class TestMRSequenceFileInputFilter extends TestCase {
@@ -96,7 +97,7 @@
       RecordReader<Text, BytesWritable> reader =
         format.createRecordReader(split, context);
       MapContext<Text, BytesWritable, Text, BytesWritable> mcontext = 
-        new MapContext<Text, BytesWritable, Text, BytesWritable>(
+        new MapContextImpl<Text, BytesWritable, Text, BytesWritable>(
         job.getConfiguration(), 
         context.getTaskAttemptID(), reader, null, null, 
         MapReduceTestUtil.createDummyReporter(), split);

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java Fri Sep 18 23:52:56 2009
@@ -26,6 +26,7 @@
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
 
 public class TestNLineInputFormat extends TestCase {
   private static int MAX_LENGTH = 200;
@@ -90,7 +91,7 @@
       assertEquals("reader class is LineRecordReader.", 
         LineRecordReader.class, clazz);
       MapContext<LongWritable, Text, LongWritable, Text> mcontext = 
-        new MapContext<LongWritable, Text, LongWritable, Text>(
+        new MapContextImpl<LongWritable, Text, LongWritable, Text>(
           job.getConfiguration(), context.getTaskAttemptID(), reader, null,
           null, MapReduceTestUtil.createDummyReporter(), splits.get(i));
       reader.initialize(splits.get(i), mcontext);

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestJoinProperties.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestJoinProperties.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestJoinProperties.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestJoinProperties.java Fri Sep 18 23:52:56 2009
@@ -35,6 +35,7 @@
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
 
 public class TestJoinProperties extends TestCase {
 
@@ -374,7 +375,7 @@
         RecordReader reader = format.createRecordReader(
 	            split, context);
       MapContext mcontext = 
-        new MapContext(conf, 
+        new MapContextImpl(conf, 
         context.getTaskAttemptID(), reader, null, null, 
         MapReduceTestUtil.createDummyReporter(), split);
       reader.initialize(split, mcontext);

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestWrappedRRClassloader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestWrappedRRClassloader.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestWrappedRRClassloader.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestWrappedRRClassloader.java Fri Sep 18 23:52:56 2009
@@ -25,6 +25,7 @@
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.mapreduce.MapReduceTestUtil.Fake_RR;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 
 public class TestWrappedRRClassloader extends TestCase {
   /**
@@ -54,7 +55,7 @@
     TaskAttemptID tid = new TaskAttemptID("jt", 1, TaskType.MAP, 0, 0);
     conf.set(JobContext.TASK_ATTEMPT_ID, tid.toString());
     inputFormat.createRecordReader(inputFormat.getSplits(new Job(conf)).get(0), 
-      new TaskAttemptContext(conf, tid));
+      new TaskAttemptContextImpl(conf, tid));
   }
 
   public static class Fake_ClassLoader extends ClassLoader {

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java?rev=816816&r1=816815&r2=816816&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java Fri Sep 18 23:52:56 2009
@@ -37,6 +37,7 @@
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
 
 import junit.framework.TestCase;
 import org.apache.commons.logging.*;
@@ -112,7 +113,7 @@
       RecordReader<IntWritable, DoubleWritable> reader =
         iformat.createRecordReader(split, context);
       MapContext<IntWritable, DoubleWritable, BytesWritable, BytesWritable> 
-        mcontext = new MapContext<IntWritable, DoubleWritable,
+        mcontext = new MapContextImpl<IntWritable, DoubleWritable,
           BytesWritable, BytesWritable>(job.getConfiguration(), 
           context.getTaskAttemptID(), reader, null, null, 
           MapReduceTestUtil.createDummyReporter(), 



Mime
View raw message