hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r1235548 [4/8] - in /hadoop/common/branches/branch-1: ./ src/core/org/apache/hadoop/conf/ src/core/org/apache/hadoop/io/ src/mapred/org/apache/hadoop/mapreduce/ src/mapred/org/apache/hadoop/mapreduce/lib/db/ src/mapred/org/apache/hadoop/map...
Date Tue, 24 Jan 2012 23:22:01 GMT
Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFilter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFilter.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFilter.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFilter.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.DigestException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * A class that allows a map/red job to work on a sample of sequence files.
+ * The sample is decided by the filter class set by the job.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class SequenceFileInputFilter<K, V>
+    extends SequenceFileInputFormat<K, V> {
+  public static final Log LOG = LogFactory.getLog(FileInputFormat.class);
+  
+  final public static String FILTER_CLASS = 
+    "mapreduce.input.sequencefileinputfilter.class";
+  final public static String FILTER_FREQUENCY = 
+    "mapreduce.input.sequencefileinputfilter.frequency";
+  final public static String FILTER_REGEX = 
+    "mapreduce.input.sequencefileinputfilter.regex";
+    
+  public SequenceFileInputFilter() {
+  }
+    
+  /** Create a record reader for the given split
+   * @param split file split
+   * @param context the task-attempt context
+   * @return RecordReader
+   */
+  public RecordReader<K, V> createRecordReader(InputSplit split,
+      TaskAttemptContext context) throws IOException {
+    context.setStatus(split.toString());
+    return new FilterRecordReader<K, V>(context.getConfiguration());
+  }
+
+
+  /** set the filter class
+   * 
+   * @param job The job
+   * @param filterClass filter class
+   */
+  public static void setFilterClass(Job job, Class<?> filterClass) {
+    job.getConfiguration().set(FILTER_CLASS, filterClass.getName());
+  }
+
+         
+  /**
+   * filter interface
+   */
+  public interface Filter extends Configurable {
+    /** filter function
+     * Decide if a record should be filtered or not
+     * @param key record key
+     * @return true if a record is accepted; return false otherwise
+     */
+    public abstract boolean accept(Object key);
+  }
+    
+  /**
+   * base class for Filters
+   */
+  public static abstract class FilterBase implements Filter {
+    Configuration conf;
+        
+    public Configuration getConf() {
+      return conf;
+    }
+  }
+    
+  /** Records filter by matching key to regex
+   */
+  public static class RegexFilter extends FilterBase {
+    private Pattern p;
+    /** Define the filtering regex and stores it in conf
+     * @param conf where the regex is set
+     * @param regex regex used as a filter
+     */
+    public static void setPattern(Configuration conf, String regex)
+        throws PatternSyntaxException {
+      try {
+        Pattern.compile(regex);
+      } catch (PatternSyntaxException e) {
+        throw new IllegalArgumentException("Invalid pattern: "+regex);
+      }
+      conf.set(FILTER_REGEX, regex);
+    }
+        
+    public RegexFilter() { }
+        
+    /** configure the Filter by checking the configuration
+     */
+    public void setConf(Configuration conf) {
+      String regex = conf.get(FILTER_REGEX);
+      if (regex == null)
+        throw new RuntimeException(FILTER_REGEX + "not set");
+      this.p = Pattern.compile(regex);
+      this.conf = conf;
+    }
+
+
+    /** Filtering method
+     * If key matches the regex, return true; otherwise return false
+     * @see Filter#accept(Object)
+     */
+    public boolean accept(Object key) {
+      return p.matcher(key.toString()).matches();
+    }
+  }
+
+  /** This class returns a percentage of records
+   * The percentage is determined by a filtering frequency <i>f</i> using
+   * the criteria record# % f == 0.
+   * For example, if the frequency is 10, one out of 10 records is returned.
+   */
+  public static class PercentFilter extends FilterBase {
+    private int frequency;
+    private int count;
+
+    /** set the frequency and stores it in conf
+     * @param conf configuration
+     * @param frequency filtering frequencey
+     */
+    public static void setFrequency(Configuration conf, int frequency) {
+      if (frequency <= 0)
+        throw new IllegalArgumentException(
+          "Negative " + FILTER_FREQUENCY + ": " + frequency);
+      conf.setInt(FILTER_FREQUENCY, frequency);
+    }
+        
+    public PercentFilter() { }
+        
+    /** configure the filter by checking the configuration
+     * 
+     * @param conf configuration
+     */
+    public void setConf(Configuration conf) {
+      this.frequency = conf.getInt(FILTER_FREQUENCY, 10);
+      if (this.frequency <= 0) {
+        throw new RuntimeException(
+          "Negative "+FILTER_FREQUENCY + ": " + this.frequency);
+      }
+      this.conf = conf;
+    }
+
+    /** Filtering method
+     * If record# % frequency==0, return true; otherwise return false
+     * @see Filter#accept(Object)
+     */
+    public boolean accept(Object key) {
+      boolean accepted = false;
+      if (count == 0)
+        accepted = true;
+      if (++count == frequency) {
+        count = 0;
+      }
+      return accepted;
+    }
+  }
+
+  /** This class returns a set of records by examing the MD5 digest of its
+   * key against a filtering frequency <i>f</i>. The filtering criteria is
+   * MD5(key) % f == 0.
+   */
+  public static class MD5Filter extends FilterBase {
+    private int frequency;
+    private static final MessageDigest DIGESTER;
+    public static final int MD5_LEN = 16;
+    private byte [] digest = new byte[MD5_LEN];
+        
+    static {
+      try {
+        DIGESTER = MessageDigest.getInstance("MD5");
+      } catch (NoSuchAlgorithmException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+
+    /** set the filtering frequency in configuration
+     * 
+     * @param conf configuration
+     * @param frequency filtering frequency
+     */
+    public static void setFrequency(Configuration conf, int frequency) {
+      if (frequency <= 0)
+        throw new IllegalArgumentException(
+          "Negative " + FILTER_FREQUENCY + ": " + frequency);
+      conf.setInt(FILTER_FREQUENCY, frequency);
+    }
+        
+    public MD5Filter() { }
+        
+    /** configure the filter according to configuration
+     * 
+     * @param conf configuration
+     */
+    public void setConf(Configuration conf) {
+      this.frequency = conf.getInt(FILTER_FREQUENCY, 10);
+      if (this.frequency <= 0) {
+        throw new RuntimeException(
+          "Negative " + FILTER_FREQUENCY + ": " + this.frequency);
+      }
+      this.conf = conf;
+    }
+
+    /** Filtering method
+     * If MD5(key) % frequency==0, return true; otherwise return false
+     * @see Filter#accept(Object)
+     */
+    public boolean accept(Object key) {
+      try {
+        long hashcode;
+        if (key instanceof Text) {
+          hashcode = MD5Hashcode((Text)key);
+        } else if (key instanceof BytesWritable) {
+          hashcode = MD5Hashcode((BytesWritable)key);
+        } else {
+          ByteBuffer bb;
+          bb = Text.encode(key.toString());
+          hashcode = MD5Hashcode(bb.array(), 0, bb.limit());
+        }
+        if (hashcode / frequency * frequency == hashcode)
+          return true;
+      } catch(Exception e) {
+        LOG.warn(e);
+        throw new RuntimeException(e);
+      }
+      return false;
+    }
+        
+    private long MD5Hashcode(Text key) throws DigestException {
+      return MD5Hashcode(key.getBytes(), 0, key.getLength());
+    }
+        
+    private long MD5Hashcode(BytesWritable key) throws DigestException {
+      return MD5Hashcode(key.getBytes(), 0, key.getLength());
+    }
+    
+    synchronized private long MD5Hashcode(byte[] bytes, 
+        int start, int length) throws DigestException {
+      DIGESTER.update(bytes, 0, length);
+      DIGESTER.digest(digest, 0, MD5_LEN);
+      long hashcode=0;
+      for (int i = 0; i < 8; i++)
+        hashcode |= ((digest[i] & 0xffL) << (8 * (7 - i)));
+      return hashcode;
+    }
+  }
+    
+  private static class FilterRecordReader<K, V>
+      extends SequenceFileRecordReader<K, V> {
+    
+    private Filter filter;
+    private K key;
+    private V value;
+        
+    public FilterRecordReader(Configuration conf)
+        throws IOException {
+      super();
+      // instantiate filter
+      filter = (Filter)ReflectionUtils.newInstance(
+        conf.getClass(FILTER_CLASS, PercentFilter.class), conf);
+    }
+    
+    public synchronized boolean nextKeyValue() 
+        throws IOException, InterruptedException {
+      while (super.nextKeyValue()) {
+        key = super.getCurrentKey();
+        if (filter.accept(key)) {
+          value = super.getCurrentValue();
+          return true;
+        }
+      }
+      return false;
+    }
+    
+    @Override
+    public K getCurrentKey() {
+      return key;
+    }
+    
+    @Override
+    public V getCurrentValue() {
+      return value;
+    }
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFilter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * An {@link InputSplit} that tags another InputSplit with extra data for use
+ * by {@link DelegatingInputFormat}s and {@link DelegatingMapper}s.
+ */
+class TaggedInputSplit extends InputSplit implements Configurable, Writable {
+
+  private Class<? extends InputSplit> inputSplitClass;
+
+  private InputSplit inputSplit;
+
+  @SuppressWarnings("unchecked")
+  private Class<? extends InputFormat> inputFormatClass;
+
+  @SuppressWarnings("unchecked")
+  private Class<? extends Mapper> mapperClass;
+
+  private Configuration conf;
+
+  public TaggedInputSplit() {
+    // Default constructor.
+  }
+
+  /**
+   * Creates a new TaggedInputSplit.
+   * 
+   * @param inputSplit The InputSplit to be tagged
+   * @param conf The configuration to use
+   * @param inputFormatClass The InputFormat class to use for this job
+   * @param mapperClass The Mapper class to use for this job
+   */
+  @SuppressWarnings("unchecked")
+  public TaggedInputSplit(InputSplit inputSplit, Configuration conf,
+      Class<? extends InputFormat> inputFormatClass,
+      Class<? extends Mapper> mapperClass) {
+    this.inputSplitClass = inputSplit.getClass();
+    this.inputSplit = inputSplit;
+    this.conf = conf;
+    this.inputFormatClass = inputFormatClass;
+    this.mapperClass = mapperClass;
+  }
+
+  /**
+   * Retrieves the original InputSplit.
+   * 
+   * @return The InputSplit that was tagged
+   */
+  public InputSplit getInputSplit() {
+    return inputSplit;
+  }
+
+  /**
+   * Retrieves the InputFormat class to use for this split.
+   * 
+   * @return The InputFormat class to use
+   */
+  @SuppressWarnings("unchecked")
+  public Class<? extends InputFormat> getInputFormatClass() {
+    return inputFormatClass;
+  }
+
+  /**
+   * Retrieves the Mapper class to use for this split.
+   * 
+   * @return The Mapper class to use
+   */
+  @SuppressWarnings("unchecked")
+  public Class<? extends Mapper> getMapperClass() {
+    return mapperClass;
+  }
+
+  public long getLength() throws IOException, InterruptedException {
+    return inputSplit.getLength();
+  }
+
+  public String[] getLocations() throws IOException, InterruptedException {
+    return inputSplit.getLocations();
+  }
+
+  @SuppressWarnings("unchecked")
+  public void readFields(DataInput in) throws IOException {
+    inputSplitClass = (Class<? extends InputSplit>) readClass(in);
+    inputFormatClass = (Class<? extends InputFormat<?, ?>>) readClass(in);
+    mapperClass = (Class<? extends Mapper<?, ?, ?, ?>>) readClass(in);
+    inputSplit = (InputSplit) ReflectionUtils
+       .newInstance(inputSplitClass, conf);
+    SerializationFactory factory = new SerializationFactory(conf);
+    Deserializer deserializer = factory.getDeserializer(inputSplitClass);
+    deserializer.open((DataInputStream)in);
+    inputSplit = (InputSplit)deserializer.deserialize(inputSplit);
+  }
+
+  private Class<?> readClass(DataInput in) throws IOException {
+    String className = Text.readString(in);
+    try {
+      return conf.getClassByName(className);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException("readObject can't find class", e);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, inputSplitClass.getName());
+    Text.writeString(out, inputFormatClass.getName());
+    Text.writeString(out, mapperClass.getName());
+    SerializationFactory factory = new SerializationFactory(conf);
+    Serializer serializer = 
+          factory.getSerializer(inputSplitClass);
+    serializer.open((DataOutputStream)out);
+    serializer.serialize(inputSplit);
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/jobcontrol/ControlledJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/jobcontrol/ControlledJob.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/jobcontrol/ControlledJob.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/jobcontrol/ControlledJob.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.jobcontrol;
+
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.StringUtils;
+
+/** 
+ *  This class encapsulates a MapReduce job and its dependency. It monitors 
+ *  the states of the depending jobs and updates the state of this job.
+ *  A job starts in the WAITING state. If it does not have any depending jobs,
+ *  or all of the depending jobs are in SUCCESS state, then the job state 
+ *  will become READY. If any depending jobs fail, the job will fail too. 
+ *  When in READY state, the job can be submitted to Hadoop for execution, with
+ *  the state changing into RUNNING state. From RUNNING state, the job 
+ *  can get into SUCCESS or FAILED state, depending 
+ *  the status of the job execution.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ControlledJob {
+  private static final Log LOG = LogFactory.getLog(ControlledJob.class);
+
+  // A job will be in one of the following states
+  public static enum State {SUCCESS, WAITING, RUNNING, READY, FAILED,
+                            DEPENDENT_FAILED}; 
+  public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist";
+  private State state;
+  private String controlID;     // assigned and used by JobControl class
+  private Job job;               // mapreduce job to be executed.
+  // some info for human consumption, e.g. the reason why the job failed
+  private String message;
+  // the jobs the current job depends on
+  private List<ControlledJob> dependingJobs;
+	
+  /** 
+   * Construct a job.
+   * @param job a mapreduce job to be executed.
+   * @param dependingJobs an array of jobs the current job depends on
+   */
+  public ControlledJob(Job job, List<ControlledJob> dependingJobs) 
+      throws IOException {
+    this.job = job;
+    this.dependingJobs = dependingJobs;
+    this.state = State.WAITING;
+    this.controlID = "unassigned";
+    this.message = "just initialized";
+  }
+  
+  /**
+   * Construct a job.
+   * 
+   * @param conf mapred job configuration representing a job to be executed.
+   * @throws IOException
+   */
+  public ControlledJob(Configuration conf) throws IOException {
+    this(new Job(conf), null);
+  }
+	
+  @Override
+  public String toString() {
+    StringBuffer sb = new StringBuffer();
+    sb.append("job name:\t").append(this.job.getJobName()).append("\n");
+    sb.append("job id:\t").append(this.controlID).append("\n");
+    sb.append("job state:\t").append(this.state).append("\n");
+    sb.append("job mapred id:\t").append(this.job.getJobID()).append("\n");
+    sb.append("job message:\t").append(this.message).append("\n");
+		
+    if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
+      sb.append("job has no depending job:\t").append("\n");
+    } else {
+      sb.append("job has ").append(this.dependingJobs.size()).
+         append(" dependeng jobs:\n");
+      for (int i = 0; i < this.dependingJobs.size(); i++) {
+        sb.append("\t depending job ").append(i).append(":\t");
+        sb.append((this.dependingJobs.get(i)).getJobName()).append("\n");
+      }
+    }
+    return sb.toString();
+  }
+	
+  /**
+   * @return the job name of this job
+   */
+  public String getJobName() {
+    return job.getJobName();
+  }
+	
+  /**
+   * Set the job name for  this job.
+   * @param jobName the job name
+   */
+  public void setJobName(String jobName) {
+    job.setJobName(jobName);
+  }
+	
+  /**
+   * @return the job ID of this job assigned by JobControl
+   */
+  public String getJobID() {
+    return this.controlID;
+  }
+	
+  /**
+   * Set the job ID for  this job.
+   * @param id the job ID
+   */
+  public void setJobID(String id) {
+    this.controlID = id;
+  }
+	
+  /**
+   * @return the mapred ID of this job as assigned by the 
+   * mapred framework.
+   */
+  public JobID getMapredJobID() {
+    return this.job.getJobID();
+  }
+  
+  /**
+   * @return the mapreduce job 
+   */
+  public synchronized Job getJob() {
+    return this.job;
+  }
+
+  /**
+   * Set the mapreduce job
+   * @param job the mapreduce job for this job.
+   */
+  public synchronized void setJob(Job job) {
+    this.job = job;
+  }
+
+  /**
+   * @return the state of this job
+   */
+  public synchronized State getJobState() {
+    return this.state;
+  }
+	
+  /**
+   * Set the state for this job.
+   * @param state the new state for this job.
+   */
+  protected synchronized void setJobState(State state) {
+    this.state = state;
+  }
+	
+  /**
+   * @return the message of this job
+   */
+  public synchronized String getMessage() {
+    return this.message;
+  }
+
+  /**
+   * Set the message for this job.
+   * @param message the message for this job.
+   */
+  public synchronized void setMessage(String message) {
+    this.message = message;
+  }
+
+  /**
+   * @return the depending jobs of this job
+   */
+  public List<ControlledJob> getDependentJobs() {
+    return this.dependingJobs;
+  }
+  
+  /**
+   * Add a job to this jobs' dependency list. 
+   * Dependent jobs can only be added while a Job 
+   * is waiting to run, not during or afterwards.
+   * 
+   * @param dependingJob Job that this Job depends on.
+   * @return <tt>true</tt> if the Job was added.
+   */
+  public synchronized boolean addDependingJob(ControlledJob dependingJob) {
+    if (this.state == State.WAITING) { //only allowed to add jobs when waiting
+      if (this.dependingJobs == null) {
+        this.dependingJobs = new ArrayList<ControlledJob>();
+      }
+      return this.dependingJobs.add(dependingJob);
+    } else {
+      return false;
+    }
+  }
+	
+  /**
+   * @return true if this job is in a complete state
+   */
+  public synchronized boolean isCompleted() {
+    return this.state == State.FAILED || 
+      this.state == State.DEPENDENT_FAILED ||
+      this.state == State.SUCCESS;
+  }
+	
+  /**
+   * @return true if this job is in READY state
+   */
+  public synchronized boolean isReady() {
+    return this.state == State.READY;
+  }
+
+  public void killJob() throws IOException, InterruptedException {
+    job.killJob();
+  }
+  
+  public synchronized void failJob(String message) throws IOException, InterruptedException {
+    try {
+      if(job != null && this.state == State.RUNNING) {
+        job.killJob();
+      }
+    } finally {
+      this.state = State.FAILED;
+      this.message = message;
+    }
+  }
+  
+  /**
+   * Check the state of this running job. The state may 
+   * remain the same, become SUCCESS or FAILED.
+   */
+  private void checkRunningState() throws IOException, InterruptedException {
+    try {
+      if (job.isComplete()) {
+        if (job.isSuccessful()) {
+          this.state = State.SUCCESS;
+        } else {
+          this.state = State.FAILED;
+          this.message = "Job failed!";
+        }
+      }
+    } catch (IOException ioe) {
+      this.state = State.FAILED;
+      this.message = StringUtils.stringifyException(ioe);
+      try {
+        if (job != null) {
+          job.killJob();
+        }
+      } catch (IOException e) {}
+    }
+  }
+	
+  /**
+   * Check and update the state of this job. The state changes  
+   * depending on its current state and the states of the depending jobs.
+   */
+   synchronized State checkState() throws IOException, InterruptedException {
+    if (this.state == State.RUNNING) {
+      checkRunningState();
+    }
+    if (this.state != State.WAITING) {
+      return this.state;
+    }
+    if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
+      this.state = State.READY;
+      return this.state;
+    }
+    ControlledJob pred = null;
+    int n = this.dependingJobs.size();
+    for (int i = 0; i < n; i++) {
+      pred = this.dependingJobs.get(i);
+      State s = pred.checkState();
+      if (s == State.WAITING || s == State.READY || s == State.RUNNING) {
+        break; // a pred is still not completed, continue in WAITING
+        // state
+      }
+      if (s == State.FAILED || s == State.DEPENDENT_FAILED) {
+        this.state = State.DEPENDENT_FAILED;
+        this.message = "depending job " + i + " with jobID "
+          + pred.getJobID() + " failed. " + pred.getMessage();
+        break;
+      }
+      // pred must be in success state
+      if (i == n - 1) {
+        this.state = State.READY;
+      }
+    }
+
+    return this.state;
+  }
+	
+  /**
+   * Submit this job to mapred. The state becomes RUNNING if submission 
+   * is successful, FAILED otherwise.  
+   */
+  protected synchronized void submit() {
+    try {
+      Configuration conf = job.getConfiguration();
+      if (conf.getBoolean(CREATE_DIR, false)) {
+        FileSystem fs = FileSystem.get(conf);
+        Path inputPaths[] = FileInputFormat.getInputPaths(job);
+        for (int i = 0; i < inputPaths.length; i++) {
+          if (!fs.exists(inputPaths[i])) {
+            try {
+              fs.mkdirs(inputPaths[i]);
+            } catch (IOException e) {
+
+            }
+          }
+        }
+      }
+      job.submit();
+      this.state = State.RUNNING;
+    } catch (Exception ioe) {
+      LOG.info(getJobName()+" got an error while submitting ",ioe);
+      this.state = State.FAILED;
+      this.message = StringUtils.stringifyException(ioe);
+    }
+  }
+	
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/jobcontrol/ControlledJob.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.jobcontrol;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.State;
+import org.apache.hadoop.util.StringUtils;
+
+/** 
+ *  This class encapsulates a set of MapReduce jobs and its dependency.
+ *   
+ *  It tracks the states of the jobs by placing them into different tables
+ *  according to their states. 
+ *  
+ *  This class provides APIs for the client app to add a job to the group 
+ *  and to get the jobs in the group in different states. When a job is 
+ *  added, an ID unique to the group is assigned to the job. 
+ *  
+ *  This class has a thread that submits jobs when they become ready, 
+ *  monitors the states of the running jobs, and updates the states of jobs
+ *  based on the state changes of their depending jobs states. The class 
+ *  provides APIs for suspending/resuming the thread, and 
+ *  for stopping the thread.
+ *  
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class JobControl implements Runnable {
+  private static final Log LOG = LogFactory.getLog(JobControl.class);
+
+  // The thread can be in one of the following state
+  public static enum ThreadState {RUNNING, SUSPENDED,STOPPED, STOPPING, READY};
+	
+  private ThreadState runnerState;			// the thread state
+	
+  private LinkedList<ControlledJob> jobsInProgress = new LinkedList<ControlledJob>();
+  private LinkedList<ControlledJob> successfulJobs = new LinkedList<ControlledJob>();
+  private LinkedList<ControlledJob> failedJobs = new LinkedList<ControlledJob>();
+	
+  private long nextJobID;
+  private String groupName;
+	
+  /** 
+   * Construct a job control for a group of jobs.
+   * @param groupName a name identifying this group
+   */
+  public JobControl(String groupName) {
+    this.nextJobID = -1;
+    this.groupName = groupName;
+    this.runnerState = ThreadState.READY;
+  }
+	
+  private static List<ControlledJob> toList(
+                   LinkedList<ControlledJob> jobs) {
+    ArrayList<ControlledJob> retv = new ArrayList<ControlledJob>();
+    synchronized (jobs) {
+      for (ControlledJob job : jobs) {
+        retv.add(job);
+      }
+    }
+    return retv;
+  }
+	
+  synchronized private List<ControlledJob> getJobsIn(State state) {
+    LinkedList<ControlledJob> l = new LinkedList<ControlledJob>();
+    for(ControlledJob j: jobsInProgress) {
+      if(j.getJobState() == state) {
+        l.add(j);
+      }
+    }
+    return l;
+  }
+  
+  /**
+   * @return the jobs in the waiting state
+   */
+  public List<ControlledJob> getWaitingJobList() {
+    return getJobsIn(State.WAITING);
+  }
+	
+  /**
+   * @return the jobs in the running state
+   */
+  public List<ControlledJob> getRunningJobList() {
+    return getJobsIn(State.RUNNING);
+  }
+	
+  /**
+   * @return the jobs in the ready state
+   */
+  public List<ControlledJob> getReadyJobsList() {
+    return getJobsIn(State.READY);
+  }
+	
+  /**
+   * @return the jobs in the success state
+   */
+  public List<ControlledJob> getSuccessfulJobList() {
+    return toList(this.successfulJobs);
+  }
+	
+  public List<ControlledJob> getFailedJobList() {
+    return toList(this.failedJobs);
+  }
+	
+  private String getNextJobID() {
+    nextJobID += 1;
+    return this.groupName + this.nextJobID;
+  }
+
+  /**
+   * Add a new job.
+   * @param aJob the new job
+   */
+  synchronized public String addJob(ControlledJob aJob) {
+    String id = this.getNextJobID();
+    aJob.setJobID(id);
+    aJob.setJobState(State.WAITING);
+    jobsInProgress.add(aJob);
+    return id;	
+  }
+	
+  /**
+   * Add a collection of jobs
+   * 
+   * @param jobs
+   */
+  public void addJobCollection(Collection<ControlledJob> jobs) {
+    for (ControlledJob job : jobs) {
+      addJob(job);
+    }
+  }
+	
+  /**
+   * @return the thread state
+   */
+  public ThreadState getThreadState() {
+    return this.runnerState;
+  }
+	
+  /**
+   * set the thread state to STOPPING so that the 
+   * thread will stop when it wakes up.
+   */
+  public void stop() {
+    this.runnerState = ThreadState.STOPPING;
+  }
+	
+  /**
+   * suspend the running thread
+   */
+  public void suspend () {
+    if (this.runnerState == ThreadState.RUNNING) {
+      this.runnerState = ThreadState.SUSPENDED;
+    }
+  }
+	
+  /**
+   * resume the suspended thread
+   */
+  public void resume () {
+    if (this.runnerState == ThreadState.SUSPENDED) {
+      this.runnerState = ThreadState.RUNNING;
+    }
+  }
+	
+  synchronized public boolean allFinished() {
+    return jobsInProgress.isEmpty();
+  }
+	
+  /**
+   *  The main loop for the thread.
+   *  The loop does the following:
+   *  	Check the states of the running jobs
+   *  	Update the states of waiting jobs
+   *  	Submit the jobs in ready state
+   */
+  public void run() {
+    try {
+      this.runnerState = ThreadState.RUNNING;
+      while (true) {
+        while (this.runnerState == ThreadState.SUSPENDED) {
+          try {
+            Thread.sleep(5000);
+          }
+          catch (Exception e) {
+            //TODO the thread was interrupted, do something!!!
+          }
+        }
+        
+        synchronized(this) {
+          Iterator<ControlledJob> it = jobsInProgress.iterator();
+          while(it.hasNext()) {
+            ControlledJob j = it.next();
+            LOG.debug("Checking state of job "+j);
+            switch(j.checkState()) {
+            case SUCCESS:
+              successfulJobs.add(j);
+              it.remove();
+              break;
+            case FAILED:
+            case DEPENDENT_FAILED:
+              failedJobs.add(j);
+              it.remove();
+              break;
+            case READY:
+              j.submit();
+              break;
+            case RUNNING:
+            case WAITING:
+              //Do Nothing
+              break;
+            }
+          }
+        }
+        
+        if (this.runnerState != ThreadState.RUNNING && 
+            this.runnerState != ThreadState.SUSPENDED) {
+          break;
+        }
+        try {
+          Thread.sleep(5000);
+        }
+        catch (Exception e) {
+          //TODO the thread was interrupted, do something!!!
+        }
+        if (this.runnerState != ThreadState.RUNNING && 
+            this.runnerState != ThreadState.SUSPENDED) {
+          break;
+        }
+      }
+    }catch(Throwable t) {
+      LOG.error("Error while trying to run jobs.",t);
+      //Mark all jobs as failed because we got something bad.
+      failAllJobs(t);
+    }
+    this.runnerState = ThreadState.STOPPED;
+  }
+
+  synchronized private void failAllJobs(Throwable t) {
+    String message = "Unexpected System Error Occured: "+
+    StringUtils.stringifyException(t);
+    Iterator<ControlledJob> it = jobsInProgress.iterator();
+    while(it.hasNext()) {
+      ControlledJob j = it.next();
+      try {
+        j.failJob(message);
+      } catch (IOException e) {
+        LOG.error("Error while tyring to clean up "+j.getJobName(), e);
+      } catch (InterruptedException e) {
+        LOG.error("Error while tyring to clean up "+j.getJobName(), e);
+      } finally {
+        failedJobs.add(j);
+        it.remove();
+      }
+    }
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java?rev=1235548&r1=1235547&r2=1235548&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java Tue Jan 24 23:21:58 2012
@@ -39,6 +39,9 @@ import org.apache.hadoop.mapreduce.secur
 
 /** A base class for {@link OutputFormat}s that read from {@link FileSystem}s.*/
 public abstract class FileOutputFormat<K, V> extends OutputFormat<K, V> {
+  
+  protected static final String BASE_OUTPUT_NAME = "mapreduce.output.basename";
+  protected static final String PART = "part";
 
   public static enum Counter { 
     BYTES_WRITTEN
@@ -263,8 +266,22 @@ public abstract class FileOutputFormat<K
                                  String extension) throws IOException{
     FileOutputCommitter committer = 
       (FileOutputCommitter) getOutputCommitter(context);
-    return new Path(committer.getWorkPath(), getUniqueFile(context, "part", 
-                                                           extension));
+    return new Path(committer.getWorkPath(), getUniqueFile(context, 
+        getOutputName(context), extension));
+  }
+  
+  /**
+   * Get the base output name for the output file.
+   */
+  protected static String getOutputName(JobContext job) {
+    return job.getConfiguration().get(BASE_OUTPUT_NAME, PART);
+  }
+
+  /**
+   * Set the base output name for output file to be created.
+   */
+  protected static void setOutputName(JobContext job, String name) {
+    job.getConfiguration().set(BASE_OUTPUT_NAME, name);
   }
 
   public synchronized 

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/output/FilterOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/output/FilterOutputFormat.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/output/FilterOutputFormat.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/output/FilterOutputFormat.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * FilterOutputFormat is a convenience class that wraps OutputFormat. 
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class FilterOutputFormat <K,V> extends OutputFormat<K, V> {
+
+  protected OutputFormat<K,V> baseOut;
+
+  public FilterOutputFormat() {
+    this.baseOut = null;
+  }
+  
+  /**
+   * Create a FilterOutputFormat based on the underlying output format.
+   * @param baseOut the underlying OutputFormat
+   */
+  public FilterOutputFormat(OutputFormat<K,V> baseOut) {
+    this.baseOut = baseOut;
+  }
+
+  @Override
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) 
+  throws IOException, InterruptedException {
+    return getBaseOut().getRecordWriter(context);
+  }
+
+  @Override
+  public void checkOutputSpecs(JobContext context) 
+  throws IOException, InterruptedException {
+    getBaseOut().checkOutputSpecs(context);
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context) 
+  throws IOException, InterruptedException {
+    return getBaseOut().getOutputCommitter(context);
+  }
+
+  private OutputFormat<K,V> getBaseOut() throws IOException {
+    if (baseOut == null) {
+      throw new IOException("OutputFormat not set for FilterOutputFormat");
+    }
+    return baseOut;
+  }
+  /**
+   * <code>FilterRecordWriter</code> is a convenience wrapper
+   * class that extends the {@link RecordWriter}.
+   */
+
+  public static class FilterRecordWriter<K,V> extends RecordWriter<K,V> {
+
+    protected RecordWriter<K,V> rawWriter = null;
+
+    public FilterRecordWriter() {
+      rawWriter = null;
+    }
+    
+    public FilterRecordWriter(RecordWriter<K,V> rwriter) {
+      this.rawWriter = rwriter;
+    }
+    
+    @Override
+    public void write(K key, V value) throws IOException, InterruptedException {
+      getRawWriter().write(key, value);
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) 
+    throws IOException, InterruptedException {
+      getRawWriter().close(context);
+    }
+    
+    private RecordWriter<K,V> getRawWriter() throws IOException {
+      if (rawWriter == null) {
+        throw new IOException("Record Writer not set for FilterRecordWriter");
+      }
+      return rawWriter;
+    }
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/output/FilterOutputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/output/LazyOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/output/LazyOutputFormat.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/output/LazyOutputFormat.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/output/LazyOutputFormat.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * A Convenience class that creates output lazily.  
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class LazyOutputFormat <K,V> extends FilterOutputFormat<K, V> {
+  public static String OUTPUT_FORMAT = 
+    "mapreduce.output.lazyoutputformat.outputformat";
+  /**
+   * Set the underlying output format for LazyOutputFormat.
+   * @param job the {@link Job} to modify
+   * @param theClass the underlying class
+   */
+  @SuppressWarnings("unchecked")
+  public static void  setOutputFormatClass(Job job, 
+                                     Class<? extends OutputFormat> theClass) {
+      job.setOutputFormatClass(LazyOutputFormat.class);
+      job.getConfiguration().setClass(OUTPUT_FORMAT, 
+          theClass, OutputFormat.class);
+  }
+
+  @SuppressWarnings("unchecked")
+  private void getBaseOutputFormat(Configuration conf) 
+  throws IOException {
+    baseOut =  ((OutputFormat<K, V>) ReflectionUtils.newInstance(
+      conf.getClass(OUTPUT_FORMAT, null), conf));
+    if (baseOut == null) {
+      throw new IOException("Output Format not set for LazyOutputFormat");
+    }
+  }
+
+  @Override
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+  throws IOException, InterruptedException {
+    if (baseOut == null) {
+      getBaseOutputFormat(context.getConfiguration());
+    }
+    return new LazyRecordWriter<K, V>(baseOut, context);
+  }
+  
+  @Override
+  public void checkOutputSpecs(JobContext context) 
+  throws IOException, InterruptedException {
+    if (baseOut == null) {
+      getBaseOutputFormat(context.getConfiguration());
+    }
+   super.checkOutputSpecs(context);
+  }
+  
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context) 
+  throws IOException, InterruptedException {
+    if (baseOut == null) {
+      getBaseOutputFormat(context.getConfiguration());
+    }
+    return super.getOutputCommitter(context);
+  }
+  
+  /**
+   * A convenience class to be used with LazyOutputFormat
+   */
+  private static class LazyRecordWriter<K,V> extends FilterRecordWriter<K,V> {
+
+    final OutputFormat<K,V> outputFormat;
+    final TaskAttemptContext taskContext;
+
+    public LazyRecordWriter(OutputFormat<K,V> out, 
+                            TaskAttemptContext taskContext)
+    throws IOException, InterruptedException {
+      this.outputFormat = out;
+      this.taskContext = taskContext;
+    }
+
+    @Override
+    public void write(K key, V value) throws IOException, InterruptedException {
+      if (rawWriter == null) {
+        rawWriter = outputFormat.getRecordWriter(taskContext);
+      }
+      rawWriter.write(key, value);
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) 
+    throws IOException, InterruptedException {
+      if (rawWriter != null) {
+        rawWriter.close(context);
+      }
+    }
+
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/output/LazyOutputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,468 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.output;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * The MultipleOutputs class simplifies writing output data 
+ * to multiple outputs
+ * 
+ * <p> 
+ * Case one: writing to additional outputs other than the job default output.
+ *
+ * Each additional output, or named output, may be configured with its own
+ * <code>OutputFormat</code>, with its own key class and with its own value
+ * class.
+ * 
+ * <p>
+ * Case two: to write data to different files provided by user
+ * </p>
+ * 
+ * <p>
+ * MultipleOutputs supports counters, by default they are disabled. The 
+ * counters group is the {@link MultipleOutputs} class name. The names of the 
+ * counters are the same as the output name. These count the number records 
+ * written to each output name.
+ * </p>
+ * 
+ * Usage pattern for job submission:
+ * <pre>
+ *
+ * Job job = new Job();
+ *
+ * FileInputFormat.setInputPath(job, inDir);
+ * FileOutputFormat.setOutputPath(job, outDir);
+ *
+ * job.setMapperClass(MOMap.class);
+ * job.setReducerClass(MOReduce.class);
+ * ...
+ *
+ * // Defines additional single text based output 'text' for the job
+ * MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
+ * LongWritable.class, Text.class);
+ *
+ * // Defines additional sequence-file based output 'sequence' for the job
+ * MultipleOutputs.addNamedOutput(job, "seq",
+ *   SequenceFileOutputFormat.class,
+ *   LongWritable.class, Text.class);
+ * ...
+ *
+ * job.waitForCompletion(true);
+ * ...
+ * </pre>
+ * <p>
+ * Usage in Reducer:
+ * <pre>
+ * <K, V> String generateFileName(K k, V v) {
+ *   return k.toString() + "_" + v.toString();
+ * }
+ * 
+ * public class MOReduce extends
+ *   Reducer&lt;WritableComparable, Writable,WritableComparable, Writable&gt; {
+ * private MultipleOutputs mos;
+ * public void setup(Context context) {
+ * ...
+ * mos = new MultipleOutputs(context);
+ * }
+ *
+ * public void reduce(WritableComparable key, Iterator&lt;Writable&gt; values,
+ * Context context)
+ * throws IOException {
+ * ...
+ * mos.write("text", , key, new Text("Hello"));
+ * mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a");
+ * mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b");
+ * mos.write(key, new Text("value"), generateFileName(key, new Text("value")));
+ * ...
+ * }
+ *
+ * public void cleanup(Context) throws IOException {
+ * mos.close();
+ * ...
+ * }
+ *
+ * }
+ * </pre>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class MultipleOutputs<KEYOUT, VALUEOUT> {
+
+  private static final String MULTIPLE_OUTPUTS = "mapreduce.multipleoutputs";
+
+  private static final String MO_PREFIX = 
+    "mapreduce.multipleoutputs.namedOutput.";
+
+  private static final String FORMAT = ".format";
+  private static final String KEY = ".key";
+  private static final String VALUE = ".value";
+  private static final String COUNTERS_ENABLED = 
+    "mapreduce.multipleoutputs.counters";
+
+  /**
+   * Counters group used by the counters of MultipleOutputs.
+   */
+  private static final String COUNTERS_GROUP = MultipleOutputs.class.getName();
+
+  /**
+   * Cache for the taskContexts
+   */
+  private Map<String, TaskAttemptContext> taskContexts = new HashMap<String, TaskAttemptContext>();
+  /**
+   * Cached TaskAttemptContext which uses the job's configured settings
+   */
+  private TaskAttemptContext jobOutputFormatContext;
+
+  /**
+   * Checks if a named output name is valid token.
+   *
+   * @param namedOutput named output Name
+   * @throws IllegalArgumentException if the output name is not valid.
+   */
+  private static void checkTokenName(String namedOutput) {
+    if (namedOutput == null || namedOutput.length() == 0) {
+      throw new IllegalArgumentException(
+        "Name cannot be NULL or emtpy");
+    }
+    for (char ch : namedOutput.toCharArray()) {
+      if ((ch >= 'A') && (ch <= 'Z')) {
+        continue;
+      }
+      if ((ch >= 'a') && (ch <= 'z')) {
+        continue;
+      }
+      if ((ch >= '0') && (ch <= '9')) {
+        continue;
+      }
+      throw new IllegalArgumentException(
+        "Name cannot be have a '" + ch + "' char");
+    }
+  }
+
+  /**
+   * Checks if output name is valid.
+   *
+   * name cannot be the name used for the default output
+   * @param outputPath base output Name
+   * @throws IllegalArgumentException if the output name is not valid.
+   */
+  private static void checkBaseOutputPath(String outputPath) {
+    if (outputPath.equals(FileOutputFormat.PART)) {
+      throw new IllegalArgumentException("output name cannot be 'part'");
+    }
+  }
+  
+  /**
+   * Checks if a named output name is valid.
+   *
+   * @param namedOutput named output Name
+   * @throws IllegalArgumentException if the output name is not valid.
+   */
+  private static void checkNamedOutputName(JobContext job,
+      String namedOutput, boolean alreadyDefined) {
+    checkTokenName(namedOutput);
+    checkBaseOutputPath(namedOutput);
+    List<String> definedChannels = getNamedOutputsList(job);
+    if (alreadyDefined && definedChannels.contains(namedOutput)) {
+      throw new IllegalArgumentException("Named output '" + namedOutput +
+        "' already alreadyDefined");
+    } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) {
+      throw new IllegalArgumentException("Named output '" + namedOutput +
+        "' not defined");
+    }
+  }
+
+  // Returns list of channel names.
+  private static List<String> getNamedOutputsList(JobContext job) {
+    List<String> names = new ArrayList<String>();
+    StringTokenizer st = new StringTokenizer(
+      job.getConfiguration().get(MULTIPLE_OUTPUTS, ""), " ");
+    while (st.hasMoreTokens()) {
+      names.add(st.nextToken());
+    }
+    return names;
+  }
+
+  // Returns the named output OutputFormat.
+  @SuppressWarnings("unchecked")
+  private static Class<? extends OutputFormat<?, ?>> getNamedOutputFormatClass(
+    JobContext job, String namedOutput) {
+    return (Class<? extends OutputFormat<?, ?>>)
+      job.getConfiguration().getClass(MO_PREFIX + namedOutput + FORMAT, null,
+      OutputFormat.class);
+  }
+
+  // Returns the key class for a named output.
+  private static Class<?> getNamedOutputKeyClass(JobContext job,
+                                                String namedOutput) {
+    return job.getConfiguration().getClass(MO_PREFIX + namedOutput + KEY, null,
+      Object.class);
+  }
+
+  // Returns the value class for a named output.
+  private static Class<?> getNamedOutputValueClass(
+      JobContext job, String namedOutput) {
+    return job.getConfiguration().getClass(MO_PREFIX + namedOutput + VALUE,
+      null, Object.class);
+  }
+
+  /**
+   * Adds a named output for the job.
+   * <p/>
+   *
+   * @param job               job to add the named output
+   * @param namedOutput       named output name, it has to be a word, letters
+   *                          and numbers only, cannot be the word 'part' as
+   *                          that is reserved for the default output.
+   * @param outputFormatClass OutputFormat class.
+   * @param keyClass          key class
+   * @param valueClass        value class
+   */
+  @SuppressWarnings("unchecked")
+  public static void addNamedOutput(Job job, String namedOutput,
+      Class<? extends OutputFormat> outputFormatClass,
+      Class<?> keyClass, Class<?> valueClass) {
+    checkNamedOutputName(job, namedOutput, true);
+    Configuration conf = job.getConfiguration();
+    conf.set(MULTIPLE_OUTPUTS,
+      conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput);
+    conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
+      OutputFormat.class);
+    conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class);
+    conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class);
+  }
+
+  /**
+   * Enables or disables counters for the named outputs.
+   * 
+   * The counters group is the {@link MultipleOutputs} class name.
+   * The names of the counters are the same as the named outputs. These
+   * counters count the number records written to each output name.
+   * By default these counters are disabled.
+   *
+   * @param job    job  to enable counters
+   * @param enabled indicates if the counters will be enabled or not.
+   */
+  public static void setCountersEnabled(Job job, boolean enabled) {
+    job.getConfiguration().setBoolean(COUNTERS_ENABLED, enabled);
+  }
+
+  /**
+   * Returns if the counters for the named outputs are enabled or not.
+   * By default these counters are disabled.
+   *
+   * @param job    the job 
+   * @return TRUE if the counters are enabled, FALSE if they are disabled.
+   */
+  public static boolean getCountersEnabled(JobContext job) {
+    return job.getConfiguration().getBoolean(COUNTERS_ENABLED, false);
+  }
+
+  /**
+   * Wraps RecordWriter to increment counters. 
+   */
+  @SuppressWarnings("unchecked")
+  private static class RecordWriterWithCounter extends RecordWriter {
+    private RecordWriter writer;
+    private String counterName;
+    private TaskInputOutputContext context;
+
+    public RecordWriterWithCounter(RecordWriter writer, String counterName,
+                                   TaskInputOutputContext context) {
+      this.writer = writer;
+      this.counterName = counterName;
+      this.context = context;
+    }
+
+    @SuppressWarnings({"unchecked"})
+    public void write(Object key, Object value) 
+        throws IOException, InterruptedException {
+      context.getCounter(COUNTERS_GROUP, counterName).increment(1);
+      writer.write(key, value);
+    }
+
+    public void close(TaskAttemptContext context) 
+        throws IOException, InterruptedException {
+      writer.close(context);
+    }
+  }
+
+  // instance code, to be used from Mapper/Reducer code
+
+  private TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context;
+  private Set<String> namedOutputs;
+  private Map<String, RecordWriter<?, ?>> recordWriters;
+  private boolean countersEnabled;
+  
+  /**
+   * Creates and initializes multiple outputs support,
+   * it should be instantiated in the Mapper/Reducer setup method.
+   *
+   * @param context the TaskInputOutputContext object
+   */
+  public MultipleOutputs(
+      TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) {
+    this.context = context;
+    namedOutputs = Collections.unmodifiableSet(
+      new HashSet<String>(MultipleOutputs.getNamedOutputsList(context)));
+    recordWriters = new HashMap<String, RecordWriter<?, ?>>();
+    countersEnabled = getCountersEnabled(context);
+  }
+
+  /**
+   * Write key and value to the namedOutput.
+   *
+   * Output path is a unique file generated for the namedOutput.
+   * For example, {namedOutput}-(m|r)-{part-number}
+   * 
+   * @param namedOutput the named output name
+   * @param key         the key
+   * @param value       the value
+   */
+  @SuppressWarnings("unchecked")
+  public <K, V> void write(String namedOutput, K key, V value)
+      throws IOException, InterruptedException {
+    write(namedOutput, key, value, namedOutput);
+  }
+
+  /**
+   * Write key and value to baseOutputPath using the namedOutput.
+   * 
+   * @param namedOutput    the named output name
+   * @param key            the key
+   * @param value          the value
+   * @param baseOutputPath base-output path to write the record to.
+   * Note: Framework will generate unique filename for the baseOutputPath
+   */
+  @SuppressWarnings("unchecked")
+  public <K, V> void write(String namedOutput, K key, V value,
+      String baseOutputPath) throws IOException, InterruptedException {
+    checkNamedOutputName(context, namedOutput, false);
+    checkBaseOutputPath(baseOutputPath);
+    if (!namedOutputs.contains(namedOutput)) {
+      throw new IllegalArgumentException("Undefined named output '" +
+        namedOutput + "'");
+    }
+    TaskAttemptContext taskContext = getContext(namedOutput);
+    getRecordWriter(taskContext, baseOutputPath).write(key, value);
+  }
+
+  /**
+   * Write key value to an output file name.
+   * 
+   * Gets the record writer from job's output format.  
+   * Job's output format should be a FileOutputFormat.
+   * 
+   * @param key       the key
+   * @param value     the value
+   * @param baseOutputPath base-output path to write the record to.
+   * Note: Framework will generate unique filename for the baseOutputPath
+   */
+  @SuppressWarnings("unchecked")
+  public void write(KEYOUT key, VALUEOUT value, String baseOutputPath) 
+      throws IOException, InterruptedException {
+    checkBaseOutputPath(baseOutputPath);
+    if (jobOutputFormatContext == null) {
+      jobOutputFormatContext = 
+        new TaskAttemptContext(context.getConfiguration(), 
+                               context.getTaskAttemptID());
+    }
+    getRecordWriter(jobOutputFormatContext, baseOutputPath).write(key, value);
+  }
+
+  // by being synchronized MultipleOutputTask can be use with a
+  // MultithreadedMapper.
+  @SuppressWarnings("unchecked")
+  private synchronized RecordWriter getRecordWriter(
+      TaskAttemptContext taskContext, String baseFileName) 
+      throws IOException, InterruptedException {
+    
+    // look for record-writer in the cache
+    RecordWriter writer = recordWriters.get(baseFileName);
+    
+    // If not in cache, create a new one
+    if (writer == null) {
+      // get the record writer from context output format
+      FileOutputFormat.setOutputName(taskContext, baseFileName);
+      try {
+        writer = ((OutputFormat) ReflectionUtils.newInstance(
+          taskContext.getOutputFormatClass(), taskContext.getConfiguration()))
+          .getRecordWriter(taskContext);
+      } catch (ClassNotFoundException e) {
+        throw new IOException(e);
+      }
+ 
+      // if counters are enabled, wrap the writer with context 
+      // to increment counters 
+      if (countersEnabled) {
+        writer = new RecordWriterWithCounter(writer, baseFileName, context);
+      }
+      
+      // add the record-writer to the cache
+      recordWriters.put(baseFileName, writer);
+    }
+    return writer;
+  }
+
+   // Create a taskAttemptContext for the named output with 
+   // output format and output key/value types put in the context
+  private TaskAttemptContext getContext(String nameOutput) throws IOException {
+      
+    TaskAttemptContext taskContext = taskContexts.get(nameOutput);
+    
+    if (taskContext != null) {
+        return taskContext;
+    }
+    
+    // The following trick leverages the instantiation of a record writer via
+    // the job thus supporting arbitrary output formats.
+    Job job = new Job(context.getConfiguration());
+    job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput));
+    job.setOutputKeyClass(getNamedOutputKeyClass(context, nameOutput));
+    job.setOutputValueClass(getNamedOutputValueClass(context, nameOutput));
+    taskContext = new TaskAttemptContext(job.getConfiguration(), context
+        .getTaskAttemptID());
+
+    taskContexts.put(nameOutput, taskContext);
+
+    return taskContext;
+  }
+
+  /**
+   * Closes all the opened outputs.
+   * 
+   * This should be called from cleanup method of map/reduce task.
+   * If overridden subclasses must invoke <code>super.close()</code> at the
+   * end of their <code>close()</code>
+   * 
+   */
+  @SuppressWarnings("unchecked")
+  public void close() throws IOException, InterruptedException {
+    for (RecordWriter writer : recordWriters.values()) {
+      writer.close(context);
+    }
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/output/SequenceFileAsBinaryOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/output/SequenceFileAsBinaryOutputFormat.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/output/SequenceFileAsBinaryOutputFormat.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/output/SequenceFileAsBinaryOutputFormat.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+import java.io.DataOutputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile.ValueBytes;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/** 
+ * An {@link org.apache.hadoop.mapreduce.OutputFormat} that writes keys, 
+ * values to {@link SequenceFile}s in binary(raw) format
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class SequenceFileAsBinaryOutputFormat 
+    extends SequenceFileOutputFormat <BytesWritable,BytesWritable> {
+  public static String KEY_CLASS = "mapreduce.output.seqbinaryoutputformat.key.class"; 
+  public static String VALUE_CLASS = "mapreduce.output.seqbinaryoutputformat.value.class"; 
+
+  /** 
+   * Inner class used for appendRaw
+   */
+  static public class WritableValueBytes implements ValueBytes {
+    private BytesWritable value;
+
+    public WritableValueBytes() {
+      this.value = null;
+    }
+    
+    public WritableValueBytes(BytesWritable value) {
+      this.value = value;
+    }
+
+    public void reset(BytesWritable value) {
+      this.value = value;
+    }
+
+    public void writeUncompressedBytes(DataOutputStream outStream)
+        throws IOException {
+      outStream.write(value.getBytes(), 0, value.getLength());
+    }
+
+    public void writeCompressedBytes(DataOutputStream outStream)
+        throws IllegalArgumentException, IOException {
+      throw new UnsupportedOperationException(
+        "WritableValueBytes doesn't support RECORD compression"); 
+    }
+    
+    public int getSize(){
+      return value.getLength();
+    }
+  }
+
+  /**
+   * Set the key class for the {@link SequenceFile}
+   * <p>This allows the user to specify the key class to be different 
+   * from the actual class ({@link BytesWritable}) used for writing </p>
+   * 
+   * @param job the {@link Job} to modify
+   * @param theClass the SequenceFile output key class.
+   */
+  static public void setSequenceFileOutputKeyClass(Job job, 
+      Class<?> theClass) {
+    job.getConfiguration().setClass(KEY_CLASS,
+      theClass, Object.class);
+  }
+
+  /**
+   * Set the value class for the {@link SequenceFile}
+   * <p>This allows the user to specify the value class to be different 
+   * from the actual class ({@link BytesWritable}) used for writing </p>
+   * 
+   * @param job the {@link Job} to modify
+   * @param theClass the SequenceFile output key class.
+   */
+  static public void setSequenceFileOutputValueClass(Job job, 
+      Class<?> theClass) {
+    job.getConfiguration().setClass(VALUE_CLASS, 
+                  theClass, Object.class);
+  }
+
+  /**
+   * Get the key class for the {@link SequenceFile}
+   * 
+   * @return the key class of the {@link SequenceFile}
+   */
+  static public Class<? extends WritableComparable> 
+      getSequenceFileOutputKeyClass(JobContext job) { 
+    return job.getConfiguration().getClass(KEY_CLASS,
+      job.getOutputKeyClass().asSubclass(WritableComparable.class), 
+      WritableComparable.class);
+  }
+
+  /**
+   * Get the value class for the {@link SequenceFile}
+   * 
+   * @return the value class of the {@link SequenceFile}
+   */
+  static public Class<? extends Writable> getSequenceFileOutputValueClass(
+      JobContext job) { 
+    return job.getConfiguration().getClass(VALUE_CLASS, 
+      job.getOutputValueClass().asSubclass(Writable.class), Writable.class);
+  }
+  
+  @Override 
+  public RecordWriter<BytesWritable, BytesWritable> getRecordWriter(
+      TaskAttemptContext context) throws IOException {
+    final SequenceFile.Writer out = getSequenceWriter(context,
+      getSequenceFileOutputKeyClass(context),
+      getSequenceFileOutputValueClass(context)); 
+
+    return new RecordWriter<BytesWritable, BytesWritable>() {
+      private WritableValueBytes wvaluebytes = new WritableValueBytes();
+
+      public void write(BytesWritable bkey, BytesWritable bvalue)
+        throws IOException {
+        wvaluebytes.reset(bvalue);
+        out.appendRaw(bkey.getBytes(), 0, bkey.getLength(), wvaluebytes);
+        wvaluebytes.reset(null);
+      }
+
+      public void close(TaskAttemptContext context) throws IOException { 
+        out.close();
+      }
+    };
+  }
+
+  protected SequenceFile.Writer getSequenceWriter(TaskAttemptContext context,
+      Class<?> keyClass, Class<?> valueClass)
+      throws IOException {
+    Configuration conf = context.getConfiguration();
+
+    CompressionCodec codec = null;
+    CompressionType compressionType = CompressionType.NONE;
+    if (getCompressOutput(context)) {
+      // find the kind of compression to do
+      compressionType = getOutputCompressionType(context);
+      // find the right codec
+      Class<?> codecClass = getOutputCompressorClass(context,
+                                                     DefaultCodec.class);
+      codec = (CompressionCodec)
+        ReflectionUtils.newInstance(codecClass, conf);
+    }
+    // get the path of the temporary output file
+    Path file = getDefaultWorkFile(context, "");
+    FileSystem fs = file.getFileSystem(conf);
+    return SequenceFile.createWriter(fs, conf, file,
+             keyClass,
+             valueClass,
+             compressionType,
+             codec,
+             context);
+  }
+
+  @Override 
+  public void checkOutputSpecs(JobContext job) throws IOException {
+    super.checkOutputSpecs(job);
+    if (getCompressOutput(job) && 
+        getOutputCompressionType(job) == CompressionType.RECORD ) {
+      throw new InvalidJobConfException("SequenceFileAsBinaryOutputFormat "
+        + "doesn't support Record Compression" );
+    }
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/output/SequenceFileAsBinaryOutputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/partition/BinaryPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/partition/BinaryPartitioner.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/partition/BinaryPartitioner.java (added)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/partition/BinaryPartitioner.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.partition;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapreduce.Partitioner;
+
+/**
+ * <p>Partition {@link BinaryComparable} keys using a configurable part of 
+ * the bytes array returned by {@link BinaryComparable#getBytes()}.</p>
+ * 
+ * <p>The subarray to be used for the partitioning can be defined by means
+ * of the following properties:
+ * <ul>
+ *   <li>
+ *     <i>mapred.binary.partitioner.left.offset</i>:
+ *     left offset in array (0 by default)
+ *   </li>
+ *   <li>
+ *     <i>mapred.binary.partitioner.right.offset</i>: 
+ *     right offset in array (-1 by default)
+ *   </li>
+ * </ul>
+ * Like in Python, both negative and positive offsets are allowed, but
+ * the meaning is slightly different. In case of an array of length 5,
+ * for instance, the possible offsets are:
+ * <pre><code>
+ *  +---+---+---+---+---+
+ *  | B | B | B | B | B |
+ *  +---+---+---+---+---+
+ *    0   1   2   3   4
+ *   -5  -4  -3  -2  -1
+ * </code></pre>
+ * The first row of numbers gives the position of the offsets 0...5 in 
+ * the array; the second row gives the corresponding negative offsets. 
+ * Contrary to Python, the specified subarray has byte <code>i</code> 
+ * and <code>j</code> as first and last element, repectively, when 
+ * <code>i</code> and <code>j</code> are the left and right offset.
+ * 
+ * <p>For Hadoop programs written in Java, it is advisable to use one of 
+ * the following static convenience methods for setting the offsets:
+ * <ul>
+ *   <li>{@link #setOffsets}</li>
+ *   <li>{@link #setLeftOffset}</li>
+ *   <li>{@link #setRightOffset}</li>
+ * </ul></p>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class BinaryPartitioner<V> extends Partitioner<BinaryComparable, V> 
+  implements Configurable {
+
+  private static final String LEFT_OFFSET_PROPERTY_NAME = 
+    "mapred.binary.partitioner.left.offset";
+  private static final String RIGHT_OFFSET_PROPERTY_NAME = 
+    "mapred.binary.partitioner.right.offset";
+  
+  /**
+   * Set the subarray to be used for partitioning to 
+   * <code>bytes[left:(right+1)]</code> in Python syntax.
+   * 
+   * @param conf configuration object
+   * @param left left Python-style offset
+   * @param right right Python-style offset
+   */
+  public static void setOffsets(Configuration conf, int left, int right) {
+    conf.setInt(LEFT_OFFSET_PROPERTY_NAME, left);
+    conf.setInt(RIGHT_OFFSET_PROPERTY_NAME, right);
+  }
+  
+  /**
+   * Set the subarray to be used for partitioning to 
+   * <code>bytes[offset:]</code> in Python syntax.
+   * 
+   * @param conf configuration object
+   * @param offset left Python-style offset
+   */
+  public static void setLeftOffset(Configuration conf, int offset) {
+    conf.setInt(LEFT_OFFSET_PROPERTY_NAME, offset);
+  }
+  
+  /**
+   * Set the subarray to be used for partitioning to 
+   * <code>bytes[:(offset+1)]</code> in Python syntax.
+   * 
+   * @param conf configuration object
+   * @param offset right Python-style offset
+   */
+  public static void setRightOffset(Configuration conf, int offset) {
+    conf.setInt(RIGHT_OFFSET_PROPERTY_NAME, offset);
+  }
+  
+  
+  private Configuration conf;
+  private int leftOffset, rightOffset;
+  
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    leftOffset = conf.getInt(LEFT_OFFSET_PROPERTY_NAME, 0);
+    rightOffset = conf.getInt(RIGHT_OFFSET_PROPERTY_NAME, -1);
+  }
+  
+  public Configuration getConf() {
+    return conf;
+  }
+  
+  /** 
+   * Use (the specified slice of the array returned by) 
+   * {@link BinaryComparable#getBytes()} to partition. 
+   */
+  @Override
+  public int getPartition(BinaryComparable key, V value, int numPartitions) {
+    int length = key.getLength();
+    int leftIndex = (leftOffset + length) % length;
+    int rightIndex = (rightOffset + length) % length;
+    int hash = WritableComparator.hashBytes(key.getBytes(), 
+      leftIndex, rightIndex - leftIndex + 1);
+    return (hash & Integer.MAX_VALUE) % numPartitions;
+  }
+  
+}

Propchange: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/lib/partition/BinaryPartitioner.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message