hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r679845 [1/3] - in /hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce: ./ example/ lib/ lib/input/ lib/map/ lib/output/ lib/partition/ lib/reduce/
Date Fri, 25 Jul 2008 15:57:42 GMT
Author: omalley
Date: Fri Jul 25 08:57:41 2008
New Revision: 679845

URL: http://svn.apache.org/viewvc?rev=679845&view=rev
Log:
HADOOP-1230. This is a proposed patch that fills in more of the new API. Doug 
suggested checking it in, because it will be easier to review and is not called
by any code. This code compiles, but has not been tested at all...

Added:
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Counter.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/CounterGroup.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ID.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/InputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/InputSplit.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobID.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MapContext.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Mapper.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/OutputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Partitioner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/RecordReader.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/RecordWriter.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Reducer.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskAttemptContext.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskAttemptID.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskID.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskInputOutputContext.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/example/
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/example/WordCount.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileSplit.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/InvalidInputException.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileRecordReader.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/InverseMapper.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/TokenCounterMapper.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/HashPartitioner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/IntSumReducer.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/LongSumReducer.java

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Counter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Counter.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Counter.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Counter.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * A named counter that tracks the progress of a map/reduce job.
+ * 
+ * <p><code>Counters</code> represent global counters, defined either by the 
+ * Map-Reduce framework or applications. Each <code>Counter</code> is named by
+ * an {@link Enum} and has a long for the value.</p>
+ * 
+ * <p><code>Counters</code> are bunched into Groups, each comprising of
+ * counters from a particular <code>Enum</code> class. 
+ */
+public class Counter implements Writable {
+
+  private String displayName;
+  private long value;
+    
+  Counter() { 
+    value = 0L;
+  }
+
+  Counter(String displayName, long value) {
+    this.displayName = displayName;
+    this.value = value;
+  }
+    
+  /**
+   * Read the binary representation of the counter
+   */
+  public synchronized void readFields(DataInput in) throws IOException {
+    displayName = Text.readString(in);
+    value = WritableUtils.readVLong(in);
+  }
+    
+  /**
+   * Write the binary representation of the counter
+   */
+  public synchronized void write(DataOutput out) throws IOException {
+    Text.writeString(out, displayName);
+    WritableUtils.writeVLong(out, value);
+  }
+    
+  /**
+   * Get the name of the counter.
+   * @return the user facing name of the counter
+   */
+  public String getDisplayName() {
+    return displayName;
+  }
+    
+  /**
+   * What is the current value of this counter?
+   * @return the current value
+   */
+  public synchronized long getValue() {
+    return value;
+  }
+    
+  /**
+   * Increment this counter by the given value
+   * @param incr the value to increase this counter by
+   */
+  public synchronized void increment(long incr) {
+    value += incr;
+  }
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/CounterGroup.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/CounterGroup.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/CounterGroup.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/CounterGroup.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+/**
+ * A group of {@link Counter}s that logically belong together. Typically,
+ * it is an {@link Enum} subclass and the counters are the values.
+ */
+public abstract class CounterGroup implements Iterable<Counter> {
+  abstract public String getName();
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ID.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ID.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ID.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ID.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * A general identifier, which internally stores the id
+ * as an integer. This is the super class of {@link JobID}, 
+ * {@link TaskID} and {@link TaskAttemptID}.
+ * 
+ * @see JobID
+ * @see TaskID
+ * @see TaskAttemptID
+ */
+public class ID implements WritableComparable<ID> {
+  protected int id;
+
+  /** constructs an ID object from the given int */
+  public ID(int id) {
+    this.id = id;
+  }
+
+  protected ID() {
+  }
+
+  /** returns the int which represents the identifier */
+  public int getId() {
+    return id;
+  }
+
+  @Override
+  public String toString() {
+    return String.valueOf(id);
+  }
+
+  @Override
+  public int hashCode() {
+    return Integer.valueOf(id).hashCode();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if(o == null)
+      return false;
+    if (o.getClass().equals(ID.class)) {
+      ID that = (ID) o;
+      return this.id == that.id;
+    }
+    else
+      return false;
+  }
+
+  /** Compare IDs by associated numbers */
+  public int compareTo(ID that) {
+    return this.id - that.id;
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    this.id = in.readInt();
+  }
+
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(id);
+  }
+
+  public static ID read(DataInput in) throws IOException {
+    ID id = new ID();
+    id.readFields(in);
+    return id;
+  }
+
+  /**
+   * Construct an ID object from given string
+   * 
+   * @return constructed Id object or null if the given String is null
+   * @throws IllegalArgumentException if the given string is malformed
+   */
+  public static ID forName(String str) throws IllegalArgumentException {
+    if (str == null)
+      return null;
+    try {
+      int id = Integer.parseInt(str);
+      return new ID(id);
+    }
+    catch (Exception ex) {
+      throw new IllegalArgumentException("Id string : " + str
+          + " is not propoerly formed");
+    }
+  }
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/InputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/InputFormat.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/InputFormat.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/InputFormat.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/** 
+ * <code>InputFormat</code> describes the input-specification for a 
+ * Map-Reduce job. 
+ * 
+ * <p>The Map-Reduce framework relies on the <code>InputFormat</code> of the
+ * job to:<p>
+ * <ol>
+ *   <li>
+ *   Validate the input-specification of the job. 
+ *   <li>
+ *   Split-up the input file(s) into logical {@link InputSplit}s, each of 
+ *   which is then assigned to an individual {@link Mapper}.
+ *   </li>
+ *   <li>
+ *   Provide the {@link RecordReader} implementation to be used to glean
+ *   input records from the logical <code>InputSplit</code> for processing by 
+ *   the {@link Mapper}.
+ *   </li>
+ * </ol>
+ * 
+ * <p>The default behavior of file-based {@link InputFormat}s, typically 
+ * sub-classes of {@link FileInputFormat}, is to split the 
+ * input into <i>logical</i> {@link InputSplit}s based on the total size, in 
+ * bytes, of the input files. However, the {@link FileSystem} blocksize of  
+ * the input files is treated as an upper bound for input splits. A lower bound 
+ * on the split size can be set via 
+ * <a href="{@docRoot}/../hadoop-default.html#mapred.min.split.size">
+ * mapred.min.split.size</a>.</p>
+ * 
+ * <p>Clearly, logical splits based on input-size is insufficient for many 
+ * applications since record boundaries are to respected. In such cases, the
+ * application has to also implement a {@link RecordReader} on whom lies the
+ * responsibility to respect record-boundaries and present a record-oriented
+ * view of the logical <code>InputSplit</code> to the individual task.
+ *
+ * @see InputSplit
+ * @see RecordReader
+ * @see FileInputFormat
+ */
+public abstract class InputFormat<K, V> {
+
+  /** 
+   * Logically split the set of input files for the job.  
+   * 
+   * <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper}
+   * for processing.</p>
+   *
+   * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
+   * input files are not physically split into chunks. For e.g. a split could
+   * be <i>&lt;input-file-path, start, offset&gt;</i> tuple. The InputFormat
+   * also creates the {@link RecordReader} to read the {@link InputSplit}.
+   * 
+   * @param context job configuration.
+   * @return an array of {@link InputSplit}s for the job.
+   */
+  public abstract 
+    List<InputSplit> getSplits(JobContext context
+                               ) throws IOException, InterruptedException;
+  
+  /**
+   * Create a record reader for a given split. The framework will call
+   * {@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before
+   * the split is used.
+   * @param split the split to be read
+   * @param context the information about the task
+   * @return a new record reader
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract 
+    RecordReader<K,V> createRecordReader(InputSplit split,
+                                         TaskAttemptContext context
+                                        ) throws IOException, 
+                                                 InterruptedException;
+
+}
+

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/InputSplit.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/InputSplit.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/InputSplit.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/InputSplit.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+
+/**
+ * <code>InputSplit</code> represents the data to be processed by an 
+ * individual {@link Mapper}. 
+ *
+ * <p>Typically, it presents a byte-oriented view on the input and is the 
+ * responsibility of {@link RecordReader} of the job to process this and present
+ * a record-oriented view. Although the type is Writable, only the InputFormat
+ * 
+ * @see InputFormat
+ * @see RecordReader
+ */
+public abstract class InputSplit implements Writable {
+  /**
+   * Get the size of the split, so that the input splits can be sorted by size.
+   * @return the number of bytes in the split
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract long getLength() throws IOException, InterruptedException;
+
+  /**
+   * Get the list of nodes by name where the data for the split would be local.
+   * The locations do not need to be serialized by calls to 
+   * {@link Writable#write(java.io.DataOutput)}
+   * @return a new array of the node nodes.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract 
+    String[] getLocations() throws IOException, InterruptedException;
+}
\ No newline at end of file

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
+
+/**
+ * The job submitter's view of the Job. It allows the user to configure the
+ * job, submit it, control its execution, and query the state.
+ */
+public class Job extends JobContext {  
+  
+  public Job() {
+    this(new Configuration());
+  }
+
+  public Job(Configuration conf) {
+    super(conf, null);
+  }
+
+  public Job(Configuration conf, String jobName) {
+    this(conf);
+    setJobName(jobName);
+  }
+
+  /**
+   * Set the number of reduce tasks for the job.
+   * @param tasks the number of reduce tasks
+   */
+  public void setNumReduceTasks(int tasks) {
+    conf.setInt(NUM_REDUCES_ATTR, tasks);
+  }
+
+  /**
+   * Set the current working directory for the default file system.
+   * 
+   * @param dir the new current working directory.
+   */
+  public void setWorkingDirectory(Path dir) throws IOException {
+    dir = dir.makeQualified(FileSystem.get(conf));
+    conf.set(WORKING_DIR_ATTR, dir.toString());
+  }
+
+  /**
+   * Set the {@link InputFormat} for the job.
+   * @param cls the <code>InputFormat</code> to use
+   */
+  public void setInputFormatClass(Class<? extends InputFormat<?,?>> cls) {
+    conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, InputFormat.class);
+  }
+
+  /**
+   * Set the {@link OutputFormat} for the job.
+   * @param cls the <code>OutputFormat</code> to use
+   */
+  public void setOutputFormatClass(Class<? extends OutputFormat<?,?>> cls) {
+    conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, OutputFormat.class);
+  }
+
+  /**
+   * Set the {@link Mapper} for the job.
+   * @param cls the <code>Mapper</code> to use
+   */
+  public void setMapperClass(Class<? extends Mapper<?,?,?,?>> cls) {
+    conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
+  }
+
+  /**
+   * Set the combiner class for the job.
+   * @param cls the combiner to use
+   */
+  public void setCombinerClass(Class<? extends Reducer<?,?,?,?>> cls) {
+    conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
+  }
+
+  /**
+   * Set the {@link Reducer} for the job.
+   * @param cls the <code>Reducer</code> to use
+   */
+  public void setReducerClass(Class<? extends Reducer<?,?,?,?>> cls) {
+    conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class);
+  }
+
+  /**
+   * Set the {@link Partitioner} for the job.
+   * @param cls the <code>Partitioner</code> to use
+   */
+  public void setPartitionerClass(Class<? extends Partitioner<?,?>> cls) {
+    conf.setClass(PARTITIONER_CLASS_ATTR, cls, Partitioner.class);
+  }
+
+  /**
+   * Set the key class for the map output data. This allows the user to
+   * specify the map output key class to be different than the final output
+   * value class.
+   * 
+   * @param theClass the map output key class.
+   */
+  public void setMapOutputKeyClass(Class<?> theClass) {
+    conf.setClass(MAP_OUTPUT_KEY_CLASS_ATTR, theClass, Object.class);
+  }
+
+  /**
+   * Set the value class for the map output data. This allows the user to
+   * specify the map output value class to be different than the final output
+   * value class.
+   * 
+   * @param theClass the map output value class.
+   */
+  public void setMapOutputValueClass(Class<?> theClass) {
+    conf.setClass(MAP_OUTPUT_VALUE_CLASS_ATTR, theClass, Object.class);
+  }
+
+  /**
+   * Set the key class for the job output data.
+   * 
+   * @param theClass the key class for the job output data.
+   */
+  public void setOutputKeyClass(Class<?> theClass) {
+    conf.setClass(OUTPUT_KEY_CLASS_ATTR, theClass, Object.class);
+  }
+
+  /**
+   * Set the value class for job outputs.
+   * 
+   * @param theClass the value class for job outputs.
+   */
+  public void setOutputValueClass(Class<?> theClass) {
+    conf.setClass(OUTPUT_VALUE_CLASS_ATTR, theClass, Object.class);
+  }
+
+  /**
+   * Define the comparator that controls how the keys are sorted before they
+   * are passed to the {@link Reducer}.
+   * @param cls the raw comparator
+   */
+  public void setSortComparatorClass(Class<? extends RawComparator<?>> cls) {
+    conf.setClass(SORT_COMPARATOR_ATTR, cls, RawComparator.class);
+  }
+
+  /**
+   * Define the comparator that controls which keys are grouped together
+   * for a single call to 
+   * {@link Reducer#reduce(Object, Iterable, org.apache.hadoop.mapreduce.Reducer.Context)}
+   * @param cls the raw comparator to use
+   */
+  public void setGroupingComparatorClass(Class<? extends RawComparator<?>> cls){
+    conf.setClass(GROUPING_COMPARATOR_ATTR, cls, RawComparator.class);
+  }
+
+  /**
+   * Set the user-specified job name.
+   * 
+   * @param name the job's new name.
+   */
+  public void setJobName(String name) {
+    conf.set(JOB_NAME_ATTR, name);
+  }
+
+  /**
+   * Get the URL where some job progress information will be displayed.
+   * 
+   * @return the URL where some job progress information will be displayed.
+   */
+  public String getTrackingURL() {
+    // TODO
+    return null;
+  }
+
+  /**
+   * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 
+   * and 1.0.  When all map tasks have completed, the function returns 1.0.
+   * 
+   * @return the progress of the job's map-tasks.
+   * @throws IOException
+   */
+  public float mapProgress() throws IOException {
+    // TODO
+    return 0.0f;
+  }
+
+  /**
+   * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 
+   * and 1.0.  When all reduce tasks have completed, the function returns 1.0.
+   * 
+   * @return the progress of the job's reduce-tasks.
+   * @throws IOException
+   */
+  public float reduceProgress() throws IOException {
+    // TODO
+    return 0.0f;
+  }
+
+  /**
+   * Check if the job is finished or not. 
+   * This is a non-blocking call.
+   * 
+   * @return <code>true</code> if the job is complete, else <code>false</code>.
+   * @throws IOException
+   */
+  public boolean isComplete() throws IOException {
+    // TODO
+    return false;
+  }
+
+  /**
+   * Check if the job completed successfully. 
+   * 
+   * @return <code>true</code> if the job succeeded, else <code>false</code>.
+   * @throws IOException
+   */
+  public boolean isSuccessful() throws IOException {
+    // TODO
+    return false;
+  }
+
+  /**
+   * Kill the running job.  Blocks until all job tasks have been
+   * killed as well.  If the job is no longer running, it simply returns.
+   * 
+   * @throws IOException
+   */
+  public void killJob() throws IOException {
+    // TODO
+  }
+    
+  /**
+   * Get events indicating completion (success/failure) of component tasks.
+   *  
+   * @param startFrom index to start fetching events from
+   * @return an array of {@link TaskCompletionEvent}s
+   * @throws IOException
+   */
+  public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom
+                                                       ) throws IOException {
+    // TODO
+    return null;
+  }
+  
+  /**
+   * Kill indicated task attempt.
+   * 
+   * @param taskId the id of the task to be terminated.
+   * @throws IOException
+   */
+  public void killTask(TaskAttemptID taskId) throws IOException {
+    // TODO
+  }
+
+  /**
+   * Fail indicated task attempt.
+   * 
+   * @param taskId the id of the task to be terminated.
+   * @throws IOException
+   */
+  public void failTask(TaskAttemptID taskId) throws IOException {
+    // TODO
+  }
+
+  /**
+   * Gets the counters for this job.
+   * 
+   * @return the counters for this job.
+   * @throws IOException
+   */
+  public Iterable<CounterGroup> getCounters() throws IOException {
+    // TODO
+    return null;
+  }
+
+  /**
+   * Submit the job to the cluster and return immediately.
+   * @throws IOException
+   */
+  public void submit() throws IOException {
+    // TODO
+  }
+  
+  /**
+   * Submit the job to the cluster and wait for it to finish.
+   * @return true if the job succeeded
+   * @throws IOException thrown if the communication with the 
+   *         <code>JobTracker</code> is lost
+   */
+  public boolean waitForCompletion() throws IOException {
+    // TODO
+    return false;
+  }
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobContext.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobContext.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobContext.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * A read-only view of the job that is provided to the tasks while they
+ * are running.
+ */
+public class JobContext {
+  // Put all of the attribute names in here so that Job and JobContext are
+  // consistent.
+  protected static final String INPUT_FORMAT_CLASS_ATTR = "mapreduce.map.class";
+  protected static final String MAP_CLASS_ATTR = "mapreduce.map.class";
+  protected static final String COMBINE_CLASS_ATTR = "mapreduce.combine.class";
+  protected static final String REDUCE_CLASS_ATTR = "mapreduce.reduce.class";
+  protected static final String OUTPUT_FORMAT_CLASS_ATTR = 
+    "mapreduce.outputformat.class";
+  protected static final String OUTPUT_KEY_CLASS_ATTR = 
+    "mapreduce.out.key.class";
+  protected static final String OUTPUT_VALUE_CLASS_ATTR = 
+    "mapreduce.out.value.class";
+  protected static final String MAP_OUTPUT_KEY_CLASS_ATTR = 
+    "mapreduce.map.out.key.class";
+  protected static final String MAP_OUTPUT_VALUE_CLASS_ATTR = 
+    "mapreduce.map.out.value.class";
+  protected static final String NUM_REDUCES_ATTR = "mapreduce.reduce.tasks";
+  protected static final String WORKING_DIR_ATTR = "mapreduce.work.dir";
+  protected static final String JOB_NAME_ATTR = "mapreduce.job.name";
+  protected static final String SORT_COMPARATOR_ATTR = 
+    "mapreduce.sort.comparator";
+  protected static final String GROUPING_COMPARATOR_ATTR = 
+    "mapreduce.grouping.comparator";
+  protected static final String PARTITIONER_CLASS_ATTR = 
+    "mapreduce.partitioner.class";
+
+  protected final Configuration conf;
+  private final JobID jobId;
+  
+  public JobContext(Configuration conf, JobID jobId) {
+    this.conf = conf;
+    this.jobId = jobId;
+  }
+
+  /**
+   * Return the configuration for the job.
+   * @return the shared configuration object
+   */
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  /**
+   * Get the unique ID for the job.
+   * @return the object with the job id
+   */
+  public JobID getJobID() {
+    return jobId;
+  }
+  
+  /**
+   * Get configured the number of reduce tasks for this job. Defaults to 
+   * <code>1</code>.
+   * @return the number of reduce tasks for this job.
+   */
+  public int getNumReduceTasks() {
+    return conf.getInt(NUM_REDUCES_ATTR, 1);
+  }
+  
+  /**
+   * Get the current working directory for the default file system.
+   * 
+   * @return the directory name.
+   */
+  public Path getWorkingDirectory() throws IOException {
+    String name = conf.get(WORKING_DIR_ATTR);
+    if (name != null) {
+      return new Path(name);
+    } else {
+      Path dir = FileSystem.get(conf).getWorkingDirectory();
+      conf.set(WORKING_DIR_ATTR, dir.toString());
+      return dir;
+    }
+  }
+
+  /**
+   * Get the key class for the job output data.
+   * @return the key class for the job output data.
+   */
+  public Class<?> getOutputKeyClass() {
+    return conf.getClass(OUTPUT_KEY_CLASS_ATTR,
+                         LongWritable.class, Object.class);
+  }
+  
+  /**
+   * Get the value class for job outputs.
+   * @return the value class for job outputs.
+   */
+  public Class<?> getOutputValueClass() {
+    return conf.getClass(OUTPUT_VALUE_CLASS_ATTR, Text.class, Object.class);
+  }
+
+  /**
+   * Get the key class for the map output data. If it is not set, use the
+   * (final) output key class. This allows the map output key class to be
+   * different than the final output key class.
+   * @return the map output key class.
+   */
+  public Class<?> getMapOutputKeyClass() {
+    Class<?> retv = conf.getClass(MAP_OUTPUT_KEY_CLASS_ATTR, null, 
+                                  Object.class);
+    if (retv == null) {
+      retv = getOutputKeyClass();
+    }
+    return retv;
+  }
+
+  /**
+   * Get the value class for the map output data. If it is not set, use the
+   * (final) output value class This allows the map output value class to be
+   * different than the final output value class.
+   *  
+   * @return the map output value class.
+   */
+  public Class<?> getMapOutputValueClass() {
+    Class<?> retv = conf.getClass(MAP_OUTPUT_VALUE_CLASS_ATTR, null,
+        Object.class);
+    if (retv == null) {
+      retv = getOutputValueClass();
+    }
+    return retv;
+  }
+
+  /**
+   * Get the user-specified job name. This is only used to identify the 
+   * job to the user.
+   * 
+   * @return the job's name, defaulting to "".
+   */
+  public String getJobName() {
+    return conf.get(JOB_NAME_ATTR, "");
+  }
+
+  /**
+   * Get the {@link InputFormat} class for the job.
+   * 
+   * @return the {@link InputFormat} class for the job.
+   */
+  @SuppressWarnings("unchecked")
+  public Class<? extends InputFormat<?,?>> getInputFormatClass() 
+     throws ClassNotFoundException {
+    return (Class<? extends InputFormat<?,?>>) 
+      conf.getClass(INPUT_FORMAT_CLASS_ATTR, InputFormat.class);
+  }
+
+  /**
+   * Get the {@link Mapper} class for the job.
+   * 
+   * @return the {@link Mapper} class for the job.
+   */
+  @SuppressWarnings("unchecked")
+  public Class<? extends Mapper<?,?,?,?>> getMapperClass() 
+     throws ClassNotFoundException {
+    return (Class<? extends Mapper<?,?,?,?>>) 
+      conf.getClass(MAP_CLASS_ATTR, Mapper.class);
+  }
+
+  /**
+   * Get the combiner class for the job.
+   * 
+   * @return the combiner class for the job.
+   */
+  @SuppressWarnings("unchecked")
+  public Class<? extends Reducer<?,?,?,?>> getCombinerClass() 
+     throws ClassNotFoundException {
+    return (Class<? extends Reducer<?,?,?,?>>) 
+      conf.getClass(COMBINE_CLASS_ATTR, Reducer.class);
+  }
+
+  /**
+   * Get the {@link Reducer} class for the job.
+   * 
+   * @return the {@link Reducer} class for the job.
+   */
+  @SuppressWarnings("unchecked")
+  public Class<? extends Reducer<?,?,?,?>> getReducerClass() 
+     throws ClassNotFoundException {
+    return (Class<? extends Reducer<?,?,?,?>>) 
+      conf.getClass(REDUCE_CLASS_ATTR, Reducer.class);
+  }
+
+  /**
+   * Get the {@link OutputFormat} class for the job.
+   * 
+   * @return the {@link OutputFormat} class for the job.
+   */
+  @SuppressWarnings("unchecked")
+  public Class<? extends OutputFormat<?,?>> getOutputFormatClass() 
+     throws ClassNotFoundException {
+    return (Class<? extends OutputFormat<?,?>>) 
+      conf.getClass(OUTPUT_FORMAT_CLASS_ATTR, OutputFormat.class);
+  }
+
+  /**
+   * Get the {@link Partitioner} class for the job.
+   * 
+   * @return the {@link Partitioner} class for the job.
+   */
+  @SuppressWarnings("unchecked")
+  public Class<? extends Partitioner<?,?>> getPartitionerClass() 
+     throws ClassNotFoundException {
+    return (Class<? extends Partitioner<?,?>>) 
+      conf.getClass(PARTITIONER_CLASS_ATTR, Partitioner.class);
+  }
+
+  /**
+   * Get the {@link RawComparator} comparator used to compare keys.
+   * 
+   * @return the {@link RawComparator} comparator used to compare keys.
+   */
+  public RawComparator<?> getSortComparator() {
+    Class<?> theClass = conf.getClass(SORT_COMPARATOR_ATTR, null,
+                                   RawComparator.class);
+    if (theClass != null)
+      return (RawComparator<?>) ReflectionUtils.newInstance(theClass, conf);
+    return WritableComparator.get(getMapOutputKeyClass());
+  }
+
+  /** 
+   * Get the user defined {@link WritableComparable} comparator for 
+   * grouping keys of inputs to the reduce.
+   * 
+   * @return comparator set by the user for grouping values.
+   * @see Job#setGroupingComparatorClass(Class) for details.  
+   */
+  public RawComparator<?> getGroupingComparator() {
+    Class<?> theClass = conf.getClass(GROUPING_COMPARATOR_ATTR, null,
+                                   RawComparator.class);
+    if (theClass == null) {
+      return getSortComparator();
+    }
+    return (RawComparator<?>) ReflectionUtils.newInstance(theClass, conf);
+  }
+
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobID.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobID.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobID.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobID.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.text.NumberFormat;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobTracker;
+
+/**
+ * JobID represents the immutable and unique identifier for 
+ * the job. JobID consists of two parts. First part 
+ * represents the jobtracker identifier, so that jobID to jobtracker map 
+ * is defined. For cluster setup this string is the jobtracker 
+ * start time, for local setting, it is "local".
+ * Second part of the JobID is the job number. <br> 
+ * An example JobID is : 
+ * <code>job_200707121733_0003</code> , which represents the third job 
+ * running at the jobtracker started at <code>200707121733</code>. 
+ * <p>
+ * Applications should never construct or parse JobID strings, but rather 
+ * use appropriate constructors or {@link #forName(String)} method. 
+ * 
+ * @see TaskID
+ * @see TaskAttemptID
+ * @see JobTracker#getNewJobId()
+ * @see JobTracker#getStartTime()
+ */
+public class JobID extends ID {
+  private static final String JOB = "job";
+  private String jtIdentifier;
+  private static char UNDERSCORE = '_';
+  
+  private static NumberFormat idFormat = NumberFormat.getInstance();
+  static {
+    idFormat.setGroupingUsed(false);
+    idFormat.setMinimumIntegerDigits(4);
+  }
+  
+  /**
+   * Constructs a JobID object 
+   * @param jtIdentifier jobTracker identifier
+   * @param id job number
+   */
+  public JobID(String jtIdentifier, int id) {
+    super(id);
+    this.jtIdentifier = jtIdentifier;
+  }
+  
+  private JobID() { }
+  
+  public String getJtIdentifier() {
+    return jtIdentifier;
+  }
+  
+  @Override
+  public boolean equals(Object o) {
+    if(o == null)
+      return false;
+    if(o.getClass().equals(JobID.class)) {
+      JobID that = (JobID)o;
+      return this.id==that.id
+        && this.jtIdentifier.equals(that.jtIdentifier);
+    }
+    else return false;
+  }
+  
+  /**Compare JobIds by first jtIdentifiers, then by job numbers*/
+  @Override
+  public int compareTo(ID o) {
+    JobID that = (JobID)o;
+    int jtComp = this.jtIdentifier.compareTo(that.jtIdentifier);
+    if(jtComp == 0) {
+      return this.id - that.id;
+    }
+    else return jtComp;
+  }
+  
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    return builder.append(JOB).append(UNDERSCORE)
+      .append(toStringWOPrefix()).toString();
+  }
+  
+  /** Returns the string representation w/o prefix */
+  StringBuilder toStringWOPrefix() {
+    StringBuilder builder = new StringBuilder();
+    builder.append(jtIdentifier).append(UNDERSCORE)
+    .append(idFormat.format(id)).toString();
+    return builder;
+  }
+  
+  @Override
+  public int hashCode() {
+    return toStringWOPrefix().toString().hashCode();
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    this.jtIdentifier = Text.readString(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    Text.writeString(out, jtIdentifier);
+  }
+  
+  public static JobID read(DataInput in) throws IOException {
+    JobID jobId = new JobID();
+    jobId.readFields(in);
+    return jobId;
+  }
+  
+  /** Construct a JobId object from given string 
+   * @return constructed JobId object or null if the given String is null
+   * @throws IllegalArgumentException if the given string is malformed
+   */
+  public static JobID forName(String str) throws IllegalArgumentException {
+    if(str == null)
+      return null;
+    try {
+      String[] parts = str.split("_");
+      if(parts.length == 3) {
+        if(parts[0].equals(JOB)) {
+          return new JobID(parts[1], Integer.parseInt(parts[2]));
+        }
+      }
+    }catch (Exception ex) {//fall below
+    }
+    throw new IllegalArgumentException("JobId string : " + str 
+        + " is not properly formed");
+  }
+  
+  /** 
+   * Returns a regex pattern which matches task IDs. Arguments can 
+   * be given null, in which case that part of the regex will be generic.  
+   * For example to obtain a regex matching <i>any job</i> 
+   * run on the jobtracker started at <i>200707121733</i>, we would use :
+   * <pre> 
+   * JobID.getTaskIDsPattern("200707121733", null);
+   * </pre>
+   * which will return :
+   * <pre> "job_200707121733_[0-9]*" </pre> 
+   * @param jtIdentifier jobTracker identifier, or null
+   * @param jobId job number, or null
+   * @return a regex pattern matching JobIDs
+   */
+  public static String getJobIDsPattern(String jtIdentifier, Integer jobId) {
+    StringBuilder builder = new StringBuilder(JOB).append(UNDERSCORE);
+    builder.append(getJobIDsPatternWOPrefix(jtIdentifier, jobId));
+    return builder.toString();
+  }
+  
+  static StringBuilder getJobIDsPatternWOPrefix(String jtIdentifier
+      , Integer jobId) {
+    StringBuilder builder = new StringBuilder()
+      .append(jtIdentifier != null ? jtIdentifier : "[^_]*").append(UNDERSCORE)
+      .append(jobId != null ? idFormat.format(jobId) : "[0-9]*");
+    return builder;
+  }
+  
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MapContext.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MapContext.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MapContext.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MapContext.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * The context that is given to the {@link Mapper}.
+ * @param <KEYIN> the key input type to the Mapper
+ * @param <VALUEIN> the value input type to the Mapper
+ * @param <KEYOUT> the key output type from the Mapper
+ * @param <VALUEOUT> the value output type from the Mapper
+ */
+public abstract class MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
+  extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
+
+  public MapContext(Configuration conf, TaskAttemptID taskid) {
+    super(conf, taskid);
+  }
+
+  /**
+   * Get the input split for this map.
+   */
+  public abstract InputSplit getInputSplit();
+
+}
+     
\ No newline at end of file

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Mapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Mapper.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Mapper.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Mapper.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+
+/** 
+ * Maps input key/value pairs to a set of intermediate key/value pairs.  
+ * 
+ * <p>Maps are the individual tasks which transform input records into a 
+ * intermediate records. The transformed intermediate records need not be of 
+ * the same type as the input records. A given input pair may map to zero or 
+ * many output pairs.</p> 
+ * 
+ * <p>The Hadoop Map-Reduce framework spawns one map task for each 
+ * {@link InputSplit} generated by the {@link InputFormat} for the job.
+ * <code>Mapper</code> implementations can access the {@link Configuration} for 
+ * the job via the {@link JobContext#getConfiguration()}.
+ * 
+ * <p>The framework first calls 
+ * {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by
+ * {@link #map(Object, Object, Context)} 
+ * for each key/value pair in the <code>InputSplit</code>. Finally 
+ * {@link #cleanup(Context)} is called.</p>
+ * 
+ * <p>All intermediate values associated with a given output key are 
+ * subsequently grouped by the framework, and passed to a {@link Reducer} to  
+ * determine the final output. Users can control the sorting and grouping by 
+ * specifying two key {@link RawComparator} classes.</p>
+ *
+ * <p>The <code>Mapper</code> outputs are partitioned per 
+ * <code>Reducer</code>. Users can control which keys (and hence records) go to 
+ * which <code>Reducer</code> by implementing a custom {@link Partitioner}.
+ * 
+ * <p>Users can optionally specify a <code>combiner</code>, via 
+ * {@link Job#setCombinerClass(Class)}, to perform local aggregation of the 
+ * intermediate outputs, which helps to cut down the amount of data transferred 
+ * from the <code>Mapper</code> to the <code>Reducer</code>.
+ * 
+ * <p>Applications can specify if and how the intermediate
+ * outputs are to be compressed and which {@link CompressionCodec}s are to be
+ * used via the <code>Configuration</code>.</p>
+ *  
+ * <p>If the job has zero
+ * reduces then the output of the <code>Mapper</code> is directly written
+ * to the {@link OutputFormat} without sorting by keys.</p>
+ * 
+ * <p>Example:</p>
+ * <p><blockquote><pre>
+ * public class TokenCounterMapper 
+ *     extends Mapper<Object, Text, Text, IntWritable>{
+ *    
+ *   private final static IntWritable one = new IntWritable(1);
+ *   private Text word = new Text();
+ *   
+ *   public void map(Object key, Text value, Context context) throws IOException {
+ *     StringTokenizer itr = new StringTokenizer(value.toString());
+ *     while (itr.hasMoreTokens()) {
+ *       word.set(itr.nextToken());
+ *       context.collect(word, one);
+ *     }
+ *   }
+ * }
+ * </pre></blockquote></p>
+ *
+ * <p>Applications may override the {@link #run(Context)} method to exert 
+ * greater control on map processing e.g. multi-threaded <code>Mapper</code>s 
+ * etc.</p>
+ * 
+ * @see InputFormat
+ * @see JobContext
+ * @see Partitioner  
+ * @see Reducer
+ */
+public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+
+  public abstract class Context 
+    extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
+    public Context(Configuration conf, TaskAttemptID taskid) {
+      super(conf, taskid);
+    }
+  }
+  
+  /**
+   * Called once at the beginning of the task.
+   */
+  protected void setup(Context context
+                       ) throws IOException, InterruptedException {
+    // NOTHING
+  }
+
+  /**
+   * Called once for each key/value pair in the input split. Most applications
+   * should override this, but the default is the identity function.
+   */
+  @SuppressWarnings("unchecked")
+  protected void map(KEYIN key, VALUEIN value, 
+                     Context context) throws IOException, InterruptedException {
+    context.collect((KEYOUT) key, (VALUEOUT) value);
+  }
+
+  /**
+   * Called once at the end of the task.
+   */
+  protected void cleanup(Context context
+                         ) throws IOException, InterruptedException {
+    // NOTHING
+  }
+  
+  /**
+   * Expert users can override this method for more complete control over the
+   * execution of the Mapper.
+   * @param context
+   * @throws IOException
+   */
+  public void run(Context context) throws IOException, InterruptedException {
+    setup(context);
+    KEYIN key = context.nextKey(null);
+    VALUEIN value = null;
+    while (key != null) {
+      value = context.nextValue(value);
+      map(key, value, context);
+      key = context.nextKey(key);
+    }
+    cleanup(context);
+  }
+}
\ No newline at end of file

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/OutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/OutputFormat.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/OutputFormat.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/OutputFormat.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+
+/** 
+ * <code>OutputFormat</code> describes the output-specification for a 
+ * Map-Reduce job.
+ *
+ * <p>The Map-Reduce framework relies on the <code>OutputFormat</code> of the
+ * job to:<p>
+ * <ol>
+ *   <li>
+ *   Validate the output-specification of the job. For e.g. check that the 
+ *   output directory doesn't already exist. 
+ *   <li>
+ *   Provide the {@link RecordWriter} implementation to be used to write out
+ *   the output files of the job. Output files are stored in a 
+ *   {@link FileSystem}.
+ *   </li>
+ * </ol>
+ * 
+ * @see RecordWriter
+ */
+public interface OutputFormat<K, V> {
+
+  /** 
+   * Get the {@link RecordWriter} for the given task.
+   *
+   * @param context the information about the current task.
+   * @return a {@link RecordWriter} to write the output for the job.
+   * @throws IOException
+   */
+  RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+  throws IOException, InterruptedException;
+
+  /** 
+   * Check for validity of the output-specification for the job.
+   *  
+   * <p>This is to validate the output specification for the job when it is
+   * a job is submitted.  Typically checks that it does not already exist,
+   * throwing an exception when it already exists, so that output is not
+   * overwritten.</p>
+   *
+   * @param context information about the job
+   * @throws IOException when output should not be attempted
+   */
+  void checkOutputSpecs(JobContext context
+                        ) throws IOException, InterruptedException;
+}
+

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Partitioner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Partitioner.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Partitioner.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Partitioner.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+/** 
+ * Partitions the key space.
+ * 
+ * <p><code>Partitioner</code> controls the partitioning of the keys of the 
+ * intermediate map-outputs. The key (or a subset of the key) is used to derive
+ * the partition, typically by a hash function. The total number of partitions
+ * is the same as the number of reduce tasks for the job. Hence this controls
+ * which of the <code>m</code> reduce tasks the intermediate key (and hence the 
+ * record) is sent for reduction.</p>
+ * 
+ * @see Reducer
+ */
+public interface Partitioner<KEY, VALUE> {
+  
+  /** 
+   * Get the partition number for a given key (hence record) given the total 
+   * number of partitions i.e. number of reduce-tasks for the job.
+   *   
+   * <p>Typically a hash function on a all or a subset of the key.</p>
+   *
+   * @param key the key to be partioned.
+   * @param value the entry value.
+   * @param numPartitions the total number of partitions.
+   * @return the partition number for the <code>key</code>.
+   */
+  int getPartition(KEY key, VALUE value, int numPartitions);
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/RecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/RecordReader.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/RecordReader.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/RecordReader.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * The record reader breaks the data into key/value pairs for input to the
+ * {@link Mapper}.
+ * @param <KEYIN>
+ * @param <VALUEIN>
+ */
+public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
+
+  /**
+   * Called once at initialization.
+   * @param split the split that defines the range of records to read
+   * @param context the information about the task
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract void initialize(InputSplit split,
+                                  TaskAttemptContext context
+                                  ) throws IOException, InterruptedException;
+
+  /**
+   * Read the next key.
+   * @param key the object to be read into, which may be null
+   * @return the object that was read
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract KEYIN nextKey(KEYIN key
+                                ) throws IOException, InterruptedException;
+
+  /**
+   * Read the next value. It must be called after {@link #nextKey(Object)}.
+   * @param value the object to read into, which may be null
+   * @return the object that was read
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract VALUEIN nextValue(VALUEIN value
+                                    ) throws IOException, InterruptedException;
+  
+  /**
+   * The current progress of the record reader through its data.
+   * @return a number between 0.0 and 1.0 that is the fraction of the data read
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract float getProgress() throws IOException, InterruptedException;
+  
+  /**
+   * Close the record reader.
+   */
+  public abstract void close() throws IOException;
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/RecordWriter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/RecordWriter.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/RecordWriter.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/RecordWriter.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+
+/**
+ * <code>RecordWriter</code> writes the output &lt;key, value&gt; pairs 
+ * to an output file.
+ 
+ * <p><code>RecordWriter</code> implementations write the job outputs to the
+ * {@link FileSystem}.
+ * 
+ * @see OutputFormat
+ */
+public interface RecordWriter<K, V> {
+  /** 
+   * Writes a key/value pair.
+   *
+   * @param key the key to write.
+   * @param value the value to write.
+   * @throws IOException
+   */      
+  void write(K key, V value) throws IOException, InterruptedException;
+
+  /** 
+   * Close this <code>RecordWriter</code> to future operations.
+   * 
+   * @param context the context of the task
+   * @throws IOException
+   */ 
+  void close(TaskAttemptContext context
+             ) throws IOException, InterruptedException;
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * The context passed to the {@link Reducer}.
+ * @param <KEYIN> the class of the input keys
+ * @param <VALUEIN> the class of the input values
+ * @param <KEYOUT> the class of the output keys
+ * @param <VALUEOUT> the class of the output values
+ */
+public abstract class ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
+    extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
+
+  public ReduceContext(Configuration conf, TaskAttemptID taskid) {
+    super(conf, taskid);
+  }
+
+  /**
+   * Iterate through the values for the current key.
+   */
+  public abstract 
+  Iterable<VALUEIN> getValues() throws IOException, InterruptedException;
+
+}
\ No newline at end of file

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Reducer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Reducer.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Reducer.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Reducer.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Partitioner;
+
+/** 
+ * Reduces a set of intermediate values which share a key to a smaller set of
+ * values.  
+ * 
+ * <p><code>Reducer</code> implementations 
+ * can access the {@link Configuration} for the job via the 
+ * {@link JobContext#getConfiguration()} method.</p>
+
+ * <p><code>Reducer</code> has 3 primary phases:</p>
+ * <ol>
+ *   <li>
+ *   
+ *   <h4 id="Shuffle">Shuffle</h4>
+ *   
+ *   <p>The <code>Reducer</code> copies the sorted output from each 
+ *   {@link Mapper} using HTTP across the network.</p>
+ *   </li>
+ *   
+ *   <li>
+ *   <h4 id="Sort">Sort</h4>
+ *   
+ *   <p>The framework merge sorts <code>Reducer</code> inputs by 
+ *   <code>key</code>s 
+ *   (since different <code>Mapper</code>s may have output the same key).</p>
+ *   
+ *   <p>The shuffle and sort phases occur simultaneously i.e. while outputs are
+ *   being fetched they are merged.</p>
+ *      
+ *   <h5 id="SecondarySort">SecondarySort</h5>
+ *   
+ *   <p>To achieve a secondary sort on the values returned by the value 
+ *   iterator, the application should extend the key with the secondary
+ *   key and define a grouping comparator. The keys will be sorted using the
+ *   entire key, but will be grouped using the grouping comparator to decide
+ *   which keys and values are sent in the same call to reduce.The grouping 
+ *   comparator is specified via 
+ *   {@link Job#setGroupingComparatorClass(Class)}. The sort order is
+ *   controlled by 
+ *   {@link Job#setSortComparatorClass(Class)}.</p>
+ *   
+ *   
+ *   For example, say that you want to find duplicate web pages and tag them 
+ *   all with the url of the "best" known example. You would set up the job 
+ *   like:
+ *   <ul>
+ *     <li>Map Input Key: url</li>
+ *     <li>Map Input Value: document</li>
+ *     <li>Map Output Key: document checksum, url pagerank</li>
+ *     <li>Map Output Value: url</li>
+ *     <li>Partitioner: by checksum</li>
+ *     <li>OutputKeyComparator: by checksum and then decreasing pagerank</li>
+ *     <li>OutputValueGroupingComparator: by checksum</li>
+ *   </ul>
+ *   </li>
+ *   
+ *   <li>   
+ *   <h4 id="Reduce">Reduce</h4>
+ *   
+ *   <p>In this phase the 
+ *   {@link #reduce(Object, Iterable, Context)}
+ *   method is called for each <code>&lt;key, (collection of values)></code> in
+ *   the sorted inputs.</p>
+ *   <p>The output of the reduce task is typically written to a 
+ *   {@link RecordWriter} via 
+ *   {@link Context#collect(Object, Object)}.</p>
+ *   </li>
+ * </ol>
+ * 
+ * <p>The output of the <code>Reducer</code> is <b>not re-sorted</b>.</p>
+ * 
+ * <p>Example:</p>
+ * <p><blockquote><pre>
+ * public class IntSumReducer<Key> extends Reducer<Key,IntWritable,
+ *                                                 Key,IntWritable> {
+ *   private IntWritable result = new IntWritable();
+ * 
+ *   public void reduce(Key key, Iterable<IntWritable> values, 
+ *                      Context context) throws IOException {
+ *     int sum = 0;
+ *     for (IntWritable val : values) {
+ *       sum += val.get();
+ *     }
+ *     result.set(sum);
+ *     context.collect(key, result);
+ *   }
+ * }
+ * </pre></blockquote></p>
+ * 
+ * @see Mapper
+ * @see Partitioner
+ */
+public abstract class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
+
+  protected abstract class Context 
+    extends ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
+    public Context(Configuration conf, TaskAttemptID taskid) {
+      super(conf, taskid);
+    }
+  }
+
+  /**
+   * Called once at the start of the task.
+   */
+  protected void setup(Context context
+                       ) throws IOException, InterruptedException {
+    // NOTHING
+  }
+
+  /**
+   * This method is called once for each key. Most applications will define
+   * their reduce class by overriding this method. The default implementation
+   * is an identity function.
+   */
+  @SuppressWarnings("unchecked")
+  protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
+                        ) throws IOException, InterruptedException {
+    for(VALUEIN value: values) {
+      context.collect((KEYOUT) key, (VALUEOUT) value);
+    }
+  }
+
+  /**
+   * Called once at the end of the task.
+   */
+  protected void cleanup(Context context
+                         ) throws IOException, InterruptedException {
+    // NOTHING
+  }
+
+  /**
+   * Advanced application writers can use the 
+   * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
+   * control how the reduce task works.
+   */
+  public void run(Context context) throws IOException, InterruptedException {
+    setup(context);
+    KEYIN key = context.nextKey(null);
+    while(key != null) {
+      reduce(key, context.getValues(), context);
+      key = context.nextKey(key);
+    }
+    cleanup(context);
+  }
+}
\ No newline at end of file

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskAttemptContext.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskAttemptContext.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskAttemptContext.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskAttemptContext.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * The context for task attempts.
+ */
+public abstract class TaskAttemptContext extends JobContext 
+    implements Progressable {
+  private final TaskAttemptID taskId;
+  private String status = "";
+  
+  public TaskAttemptContext(Configuration conf, TaskAttemptID taskId) {
+    super(conf, taskId.getJobID());
+    this.taskId = taskId;
+  }
+
+  /**
+   * Get the unique name for this task attempt.
+   */
+  public TaskAttemptID getTaskAttemptId() {
+    return taskId;
+  }
+
+  /**
+   * Set the current status of the task to the given string.
+   */
+  public void setStatus(String msg) throws IOException {
+    status = msg;
+  }
+
+  /**
+   * Get the last set status message.
+   * @return the current status message
+   */
+  public String getStatus() {
+    return status;
+  }
+
+  /**
+   * Lookup a counter by an enum.
+   */
+  public abstract Counter getCounter(Enum<?> counterName);
+
+  /**
+   * Lookup a counter by group and counter name. The enum-based interface is
+   * preferred.
+   */
+  public abstract Counter getCounter(String groupName, String counterName);
+}
\ No newline at end of file

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskAttemptID.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskAttemptID.java?rev=679845&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskAttemptID.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskAttemptID.java Fri Jul 25 08:57:41 2008
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * TaskAttemptID represents the immutable and unique identifier for 
+ * a task attempt. Each task attempt is one particular instance of a Map or
+ * Reduce Task identified by its TaskID. 
+ * 
+ * TaskAttemptID consists of 2 parts. First part is the 
+ * {@link TaskID}, that this TaskAttemptID belongs to.
+ * Second part is the task attempt number. <br> 
+ * An example TaskAttemptID is : 
+ * <code>attempt_200707121733_0003_m_000005_0</code> , which represents the
+ * zeroth task attempt for the fifth map task in the third job 
+ * running at the jobtracker started at <code>200707121733</code>. 
+ * <p>
+ * Applications should never construct or parse TaskAttemptID strings
+ * , but rather use appropriate constructors or {@link #forName(String)} 
+ * method. 
+ * 
+ * @see JobID
+ * @see TaskID
+ */
+public class TaskAttemptID extends ID {
+  private static final String ATTEMPT = "attempt";
+  private TaskID taskId;
+  private static final char UNDERSCORE = '_';
+  
+  /**
+   * Constructs a TaskAttemptID object from given {@link TaskID}.  
+   * @param taskId TaskID that this task belongs to  
+   * @param id the task attempt number
+   */
+  public TaskAttemptID(TaskID taskId, int id) {
+    super(id);
+    if(taskId == null) {
+      throw new IllegalArgumentException("taskId cannot be null");
+    }
+    this.taskId = taskId;
+  }
+  
+  /**
+   * Constructs a TaskId object from given parts.
+   * @param jtIdentifier jobTracker identifier
+   * @param jobId job number 
+   * @param isMap whether the tip is a map 
+   * @param taskId taskId number
+   * @param id the task attempt number
+   */
+  public TaskAttemptID(String jtIdentifier, int jobId, boolean isMap
+      , int taskId, int id) {
+    this(new TaskID(jtIdentifier, jobId, isMap, taskId), id);
+  }
+  
+  private TaskAttemptID() { }
+  
+  /** Returns the {@link JobID} object that this task attempt belongs to */
+  public JobID getJobID() {
+    return taskId.getJobID();
+  }
+  
+  /** Returns the {@link TaskID} object that this task attempt belongs to */
+  public TaskID getTaskID() {
+    return taskId;
+  }
+  
+  /**Returns whether this TaskAttemptID is a map ID */
+  public boolean isMap() {
+    return taskId.isMap();
+  }
+  
+  @Override
+  public boolean equals(Object o) {
+    if(o == null)
+      return false;
+    if(o.getClass().equals(TaskAttemptID.class)) {
+      TaskAttemptID that = (TaskAttemptID)o;
+      return this.id==that.id
+        && this.taskId.equals(that.taskId);
+    }
+    else return false;
+  }
+  
+  /**Compare TaskIds by first tipIds, then by task numbers. */
+  @Override
+  public int compareTo(ID o) {
+    TaskAttemptID that = (TaskAttemptID)o;
+    int tipComp = this.taskId.compareTo(that.taskId);
+    if(tipComp == 0) {
+      return this.id - that.id;
+    }
+    else return tipComp;
+  }
+  @Override
+  public String toString() { 
+    StringBuilder builder = new StringBuilder();
+    return builder.append(ATTEMPT).append(UNDERSCORE)
+      .append(toStringWOPrefix()).toString();
+  }
+
+  StringBuilder toStringWOPrefix() {
+    StringBuilder builder = new StringBuilder();
+    return builder.append(taskId.toStringWOPrefix())
+      .append(UNDERSCORE).append(id);
+  }
+  
+  @Override
+  public int hashCode() {
+    return toStringWOPrefix().toString().hashCode();
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    this.taskId = TaskID.read(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    taskId.write(out);
+  }
+  
+  public static TaskAttemptID read(DataInput in) throws IOException {
+    TaskAttemptID taskId = new TaskAttemptID();
+    taskId.readFields(in);
+    return taskId;
+  }
+  
+  /** Construct a TaskAttemptID object from given string 
+   * @return constructed TaskAttemptID object or null if the given String is null
+   * @throws IllegalArgumentException if the given string is malformed
+   */
+  public static TaskAttemptID forName(String str) throws IllegalArgumentException {
+    if(str == null)
+      return null;
+    try {
+      String[] parts = str.split("_");
+      if(parts.length == 6) {
+        if(parts[0].equals(ATTEMPT)) {
+          boolean isMap = false;
+          if(parts[3].equals("m")) isMap = true;
+          else if(parts[3].equals("r")) isMap = false;
+          else throw new Exception();
+          return new TaskAttemptID(parts[1], Integer.parseInt(parts[2]),
+              isMap, Integer.parseInt(parts[4]), Integer.parseInt(parts[5]));
+        }
+      }
+    }catch (Exception ex) {//fall below
+    }
+    throw new IllegalArgumentException("TaskAttemptId string : " + str 
+        + " is not properly formed");
+  }
+  
+  /** 
+   * Returns a regex pattern which matches task attempt IDs. Arguments can 
+   * be given null, in which case that part of the regex will be generic.  
+   * For example to obtain a regex matching <i>all task attempt IDs</i> 
+   * of <i>any jobtracker</i>, in <i>any job</i>, of the <i>first 
+   * map task</i>, we would use :
+   * <pre> 
+   * TaskAttemptID.getTaskAttemptIDsPattern(null, null, true, 1, null);
+   * </pre>
+   * which will return :
+   * <pre> "attempt_[^_]*_[0-9]*_m_000001_[0-9]*" </pre> 
+   * @param jtIdentifier jobTracker identifier, or null
+   * @param jobId job number, or null
+   * @param isMap whether the tip is a map, or null 
+   * @param taskId taskId number, or null
+   * @param attemptId the task attempt number, or null
+   * @return a regex pattern matching TaskAttemptIDs
+   */
+  public static String getTaskAttemptIDsPattern(String jtIdentifier,
+      Integer jobId, Boolean isMap, Integer taskId, Integer attemptId) {
+    StringBuilder builder = new StringBuilder(ATTEMPT).append(UNDERSCORE);
+    builder.append(getTaskAttemptIDsPatternWOPrefix(jtIdentifier, jobId,
+        isMap, taskId, attemptId));
+    return builder.toString();
+  }
+  
+  static StringBuilder getTaskAttemptIDsPatternWOPrefix(String jtIdentifier
+      , Integer jobId, Boolean isMap, Integer taskId, Integer attemptId) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(TaskID.getTaskIDsPatternWOPrefix(jtIdentifier
+        , jobId, isMap, taskId))
+        .append(UNDERSCORE)
+        .append(attemptId != null ? attemptId : "[0-9]*");
+    return builder;
+  }
+  
+}



Mime
View raw message