incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: Move org.apache.hadoop.* code to org.apache.crunch.hadoop package. Contributed by Eli Reisman.
Date Wed, 18 Jul 2012 13:14:20 GMT
Updated Branches:
  refs/heads/master c0aaabdf8 -> 4097823bd


Move org.apache.hadoop.* code to org.apache.crunch.hadoop package. Contributed by Eli Reisman.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/4097823b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/4097823b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/4097823b

Branch: refs/heads/master
Commit: 4097823bdc36adeb70b32ea39cbfb0230b08bf8a
Parents: c0aaabd
Author: jwills <jwills@apache.org>
Authored: Wed Jul 18 06:13:55 2012 -0700
Committer: jwills <jwills@apache.org>
Committed: Wed Jul 18 06:13:55 2012 -0700

----------------------------------------------------------------------
 .../lib/jobcontrol/CrunchControlledJob.java        |  339 +++++++++++
 .../mapreduce/lib/jobcontrol/CrunchJobControl.java |  297 +++++++++
 .../lib/output/CrunchMultipleOutputs.java          |  474 +++++++++++++++
 .../crunch/impl/mr/emit/MultipleOutputEmitter.java |    2 +-
 .../org/apache/crunch/impl/mr/exec/CrunchJob.java  |    2 +-
 .../org/apache/crunch/impl/mr/exec/MRExecutor.java |    4 +-
 .../crunch/impl/mr/run/CrunchTaskContext.java      |    2 +-
 .../java/org/apache/crunch/io/PathTargetImpl.java  |    2 +-
 .../org/apache/crunch/io/impl/FileTargetImpl.java  |    2 +-
 .../lib/jobcontrol/CrunchControlledJob.java        |  339 -----------
 .../mapreduce/lib/jobcontrol/CrunchJobControl.java |  295 ---------
 .../lib/output/CrunchMultipleOutputs.java          |  471 --------------
 12 files changed, 1117 insertions(+), 1112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/4097823b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
new file mode 100644
index 0000000..205e8e3
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.hadoop.mapreduce.lib.jobcontrol;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.State;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * This class encapsulates a MapReduce job and its dependency. It monitors the
+ * states of the depending jobs and updates the state of this job. A job starts
+ * in the WAITING state. If it does not have any depending jobs, or all of the
+ * depending jobs are in SUCCESS state, then the job state will become READY. If
+ * any depending jobs fail, the job will fail too. When in READY state, the job
+ * can be submitted to Hadoop for execution, with the state changing into
+ * RUNNING state. From RUNNING state, the job can get into SUCCESS or FAILED
+ * state, depending the status of the job execution.
+ */
+public class CrunchControlledJob {
+
+  // A job will be in one of the following states
+  public static enum State {
+    SUCCESS, WAITING, RUNNING, READY, FAILED, DEPENDENT_FAILED
+  };
+
+  public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist";
+  protected State state;
+  protected Job job; // mapreduce job to be executed.
+  // some info for human consumption, e.g. the reason why the job failed
+  protected String message;
+  private String controlID; // assigned and used by JobControl class
+  // the jobs the current job depends on
+  private List<CrunchControlledJob> dependingJobs;
+
+  /**
+   * Construct a job.
+   * 
+   * @param job
+   *          a mapreduce job to be executed.
+   * @param dependingJobs
+   *          an array of jobs the current job depends on
+   */
+  public CrunchControlledJob(Job job, List<CrunchControlledJob> dependingJobs)
+      throws IOException {
+    this.job = job;
+    this.dependingJobs = dependingJobs;
+    this.state = State.WAITING;
+    this.controlID = "unassigned";
+    this.message = "just initialized";
+  }
+
+  /**
+   * Construct a job.
+   * 
+   * @param conf
+   *          mapred job configuration representing a job to be executed.
+   * @throws IOException
+   */
+  public CrunchControlledJob(Configuration conf) throws IOException {
+    this(new Job(conf), null);
+  }
+
+  @Override
+  public String toString() {
+    StringBuffer sb = new StringBuffer();
+    sb.append("job name:\t").append(this.job.getJobName()).append("\n");
+    sb.append("job id:\t").append(this.controlID).append("\n");
+    sb.append("job state:\t").append(this.state).append("\n");
+    sb.append("job mapred id:\t").append(this.job.getJobID()).append("\n");
+    sb.append("job message:\t").append(this.message).append("\n");
+
+    if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
+      sb.append("job has no depending job:\t").append("\n");
+    } else {
+      sb.append("job has ").append(this.dependingJobs.size())
+          .append(" dependeng jobs:\n");
+      for (int i = 0; i < this.dependingJobs.size(); i++) {
+        sb.append("\t depending job ").append(i).append(":\t");
+        sb.append((this.dependingJobs.get(i)).getJobName()).append("\n");
+      }
+    }
+    return sb.toString();
+  }
+
+  /**
+   * @return the job name of this job
+   */
+  public String getJobName() {
+    return job.getJobName();
+  }
+
+  /**
+   * Set the job name for this job.
+   * 
+   * @param jobName
+   *          the job name
+   */
+  public void setJobName(String jobName) {
+    job.setJobName(jobName);
+  }
+
+  /**
+   * @return the job ID of this job assigned by JobControl
+   */
+  public String getJobID() {
+    return this.controlID;
+  }
+
+  /**
+   * Set the job ID for this job.
+   * 
+   * @param id
+   *          the job ID
+   */
+  public void setJobID(String id) {
+    this.controlID = id;
+  }
+
+  /**
+   * @return the mapred ID of this job as assigned by the mapred framework.
+   */
+  public JobID getMapredJobID() {
+    return this.job.getJobID();
+  }
+
+  /**
+   * @return the mapreduce job
+   */
+  public synchronized Job getJob() {
+    return this.job;
+  }
+
+  /**
+   * Set the mapreduce job
+   * 
+   * @param job
+   *          the mapreduce job for this job.
+   */
+  public synchronized void setJob(Job job) {
+    this.job = job;
+  }
+
+  /**
+   * @return the state of this job
+   */
+  public synchronized State getJobState() {
+    return this.state;
+  }
+
+  /**
+   * Set the state for this job.
+   * 
+   * @param state
+   *          the new state for this job.
+   */
+  protected synchronized void setJobState(State state) {
+    this.state = state;
+  }
+
+  /**
+   * @return the message of this job
+   */
+  public synchronized String getMessage() {
+    return this.message;
+  }
+
+  /**
+   * Set the message for this job.
+   * 
+   * @param message
+   *          the message for this job.
+   */
+  public synchronized void setMessage(String message) {
+    this.message = message;
+  }
+
+  /**
+   * @return the depending jobs of this job
+   */
+  public List<CrunchControlledJob> getDependentJobs() {
+    return this.dependingJobs;
+  }
+
+  /**
+   * Add a job to this jobs' dependency list. Dependent jobs can only be added
+   * while a Job is waiting to run, not during or afterwards.
+   * 
+   * @param dependingJob
+   *          Job that this Job depends on.
+   * @return <tt>true</tt> if the Job was added.
+   */
+  public synchronized boolean addDependingJob(CrunchControlledJob dependingJob) {
+    if (this.state == State.WAITING) { // only allowed to add jobs when waiting
+      if (this.dependingJobs == null) {
+        this.dependingJobs = new ArrayList<CrunchControlledJob>();
+      }
+      return this.dependingJobs.add(dependingJob);
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * @return true if this job is in a complete state
+   */
+  public synchronized boolean isCompleted() {
+    return this.state == State.FAILED || this.state == State.DEPENDENT_FAILED
+        || this.state == State.SUCCESS;
+  }
+
+  /**
+   * @return true if this job is in READY state
+   */
+  public synchronized boolean isReady() {
+    return this.state == State.READY;
+  }
+
+  public void killJob() throws IOException, InterruptedException {
+    job.killJob();
+  }
+
+  /**
+   * Check the state of this running job. The state may remain the same, become
+   * SUCCESS or FAILED.
+   */
+  protected void checkRunningState() throws IOException, InterruptedException {
+    try {
+      if (job.isComplete()) {
+        if (job.isSuccessful()) {
+          this.state = State.SUCCESS;
+        } else {
+          this.state = State.FAILED;
+          this.message = "Job failed!";
+        }
+      }
+    } catch (IOException ioe) {
+      this.state = State.FAILED;
+      this.message = StringUtils.stringifyException(ioe);
+      try {
+        if (job != null) {
+          job.killJob();
+        }
+      } catch (IOException e) {
+      }
+    }
+  }
+
+  /**
+   * Check and update the state of this job. The state changes depending on its
+   * current state and the states of the depending jobs.
+   */
+  synchronized State checkState() throws IOException, InterruptedException {
+    if (this.state == State.RUNNING) {
+      checkRunningState();
+    }
+    if (this.state != State.WAITING) {
+      return this.state;
+    }
+    if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
+      this.state = State.READY;
+      return this.state;
+    }
+    CrunchControlledJob pred = null;
+    int n = this.dependingJobs.size();
+    for (int i = 0; i < n; i++) {
+      pred = this.dependingJobs.get(i);
+      State s = pred.checkState();
+      if (s == State.WAITING || s == State.READY || s == State.RUNNING) {
+        break; // a pred is still not completed, continue in WAITING
+        // state
+      }
+      if (s == State.FAILED || s == State.DEPENDENT_FAILED) {
+        this.state = State.DEPENDENT_FAILED;
+        this.message = "depending job " + i + " with jobID " + pred.getJobID()
+            + " failed. " + pred.getMessage();
+        break;
+      }
+      // pred must be in success state
+      if (i == n - 1) {
+        this.state = State.READY;
+      }
+    }
+
+    return this.state;
+  }
+
+  /**
+   * Submit this job to mapred. The state becomes RUNNING if submission is
+   * successful, FAILED otherwise.
+   */
+  protected synchronized void submit() {
+    try {
+      Configuration conf = job.getConfiguration();
+      if (conf.getBoolean(CREATE_DIR, false)) {
+        FileSystem fs = FileSystem.get(conf);
+        Path inputPaths[] = FileInputFormat.getInputPaths(job);
+        for (int i = 0; i < inputPaths.length; i++) {
+          if (!fs.exists(inputPaths[i])) {
+            try {
+              fs.mkdirs(inputPaths[i]);
+            } catch (IOException e) {
+
+            }
+          }
+        }
+      }
+      job.submit();
+      this.state = State.RUNNING;
+    } catch (Exception ioe) {
+      this.state = State.FAILED;
+      this.message = StringUtils.stringifyException(ioe);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/4097823b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
new file mode 100644
index 0000000..9e9af85
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.hadoop.mapreduce.lib.jobcontrol;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.State;
+
+/**
+ * This class encapsulates a set of MapReduce jobs and its dependency.
+ * 
+ * It tracks the states of the jobs by placing them into different tables
+ * according to their states.
+ * 
+ * This class provides APIs for the client app to add a job to the group and to
+ * get the jobs in the group in different states. When a job is added, an ID
+ * unique to the group is assigned to the job.
+ * 
+ * This class has a thread that submits jobs when they become ready, monitors
+ * the states of the running jobs, and updates the states of jobs based on the
+ * state changes of their depending jobs states. The class provides APIs for
+ * suspending/resuming the thread, and for stopping the thread.
+ * 
+ * TODO This is mostly a copy of the JobControl class in Hadoop MapReduce core. 
+ * Once the location and interface of the class are more stable in CDH, this class 
+ * should be removed completely and be based on the hadoop-core class.
+ */
+public class CrunchJobControl implements Runnable {
+
+  // The thread can be in one of the following state
+  public static enum ThreadState {
+    RUNNING, SUSPENDED, STOPPED, STOPPING, READY
+  };
+
+  private ThreadState runnerState; // the thread state
+
+  private Map<String, CrunchControlledJob> waitingJobs;
+  private Map<String, CrunchControlledJob> readyJobs;
+  private Map<String, CrunchControlledJob> runningJobs;
+  private Map<String, CrunchControlledJob> successfulJobs;
+  private Map<String, CrunchControlledJob> failedJobs;
+
+  private long nextJobID;
+  private String groupName;
+
+  /**
+   * Construct a job control for a group of jobs.
+   * 
+   * @param groupName
+   *          a name identifying this group
+   */
+  public CrunchJobControl(String groupName) {
+    this.waitingJobs = new Hashtable<String, CrunchControlledJob>();
+    this.readyJobs = new Hashtable<String, CrunchControlledJob>();
+    this.runningJobs = new Hashtable<String, CrunchControlledJob>();
+    this.successfulJobs = new Hashtable<String, CrunchControlledJob>();
+    this.failedJobs = new Hashtable<String, CrunchControlledJob>();
+    this.nextJobID = -1;
+    this.groupName = groupName;
+    this.runnerState = ThreadState.READY;
+  }
+
+  private static List<CrunchControlledJob> toList(Map<String, CrunchControlledJob> jobs) {
+    ArrayList<CrunchControlledJob> retv = new ArrayList<CrunchControlledJob>();
+    synchronized (jobs) {
+      for (CrunchControlledJob job : jobs.values()) {
+        retv.add(job);
+      }
+    }
+    return retv;
+  }
+
+  /**
+   * @return the jobs in the waiting state
+   */
+  public List<CrunchControlledJob> getWaitingJobList() {
+    return toList(this.waitingJobs);
+  }
+
+  /**
+   * @return the jobs in the running state
+   */
+  public List<CrunchControlledJob> getRunningJobList() {
+    return toList(this.runningJobs);
+  }
+
+  /**
+   * @return the jobs in the ready state
+   */
+  public List<CrunchControlledJob> getReadyJobsList() {
+    return toList(this.readyJobs);
+  }
+
+  /**
+   * @return the jobs in the success state
+   */
+  public List<CrunchControlledJob> getSuccessfulJobList() {
+    return toList(this.successfulJobs);
+  }
+
+  public List<CrunchControlledJob> getFailedJobList() {
+    return toList(this.failedJobs);
+  }
+
+  private String getNextJobID() {
+    nextJobID += 1;
+    return this.groupName + this.nextJobID;
+  }
+
+  private static void addToQueue(CrunchControlledJob aJob,
+      Map<String, CrunchControlledJob> queue) {
+    synchronized (queue) {
+      queue.put(aJob.getJobID(), aJob);
+    }
+  }
+
+  private void addToQueue(CrunchControlledJob aJob) {
+    Map<String, CrunchControlledJob> queue = getQueue(aJob.getJobState());
+    addToQueue(aJob, queue);
+  }
+
+  private Map<String, CrunchControlledJob> getQueue(State state) {
+    Map<String, CrunchControlledJob> retv = null;
+    if (state == State.WAITING) {
+      retv = this.waitingJobs;
+    } else if (state == State.READY) {
+      retv = this.readyJobs;
+    } else if (state == State.RUNNING) {
+      retv = this.runningJobs;
+    } else if (state == State.SUCCESS) {
+      retv = this.successfulJobs;
+    } else if (state == State.FAILED || state == State.DEPENDENT_FAILED) {
+      retv = this.failedJobs;
+    }
+    return retv;
+  }
+
+  /**
+   * Add a new job.
+   * 
+   * @param aJob
+   *          the new job
+   */
+  synchronized public String addJob(CrunchControlledJob aJob) {
+    String id = this.getNextJobID();
+    aJob.setJobID(id);
+    aJob.setJobState(State.WAITING);
+    this.addToQueue(aJob);
+    return id;
+  }
+
+  /**
+   * Add a collection of jobs
+   * 
+   * @param jobs
+   */
+  public void addJobCollection(Collection<CrunchControlledJob> jobs) {
+    for (CrunchControlledJob job : jobs) {
+      addJob(job);
+    }
+  }
+
+  /**
+   * @return the thread state
+   */
+  public ThreadState getThreadState() {
+    return this.runnerState;
+  }
+
+  /**
+   * set the thread state to STOPPING so that the thread will stop when it wakes
+   * up.
+   */
+  public void stop() {
+    this.runnerState = ThreadState.STOPPING;
+  }
+
+  /**
+   * suspend the running thread
+   */
+  public void suspend() {
+    if (this.runnerState == ThreadState.RUNNING) {
+      this.runnerState = ThreadState.SUSPENDED;
+    }
+  }
+
+  /**
+   * resume the suspended thread
+   */
+  public void resume() {
+    if (this.runnerState == ThreadState.SUSPENDED) {
+      this.runnerState = ThreadState.RUNNING;
+    }
+  }
+
+  synchronized private void checkRunningJobs() throws IOException,
+      InterruptedException {
+
+    Map<String, CrunchControlledJob> oldJobs = null;
+    oldJobs = this.runningJobs;
+    this.runningJobs = new Hashtable<String, CrunchControlledJob>();
+
+    for (CrunchControlledJob nextJob : oldJobs.values()) {
+      nextJob.checkState();
+      this.addToQueue(nextJob);
+    }
+  }
+
+  synchronized private void checkWaitingJobs() throws IOException,
+      InterruptedException {
+    Map<String, CrunchControlledJob> oldJobs = null;
+    oldJobs = this.waitingJobs;
+    this.waitingJobs = new Hashtable<String, CrunchControlledJob>();
+
+    for (CrunchControlledJob nextJob : oldJobs.values()) {
+      nextJob.checkState();
+      this.addToQueue(nextJob);
+    }
+  }
+
+  synchronized private void startReadyJobs() {
+    Map<String, CrunchControlledJob> oldJobs = null;
+    oldJobs = this.readyJobs;
+    this.readyJobs = new Hashtable<String, CrunchControlledJob>();
+
+    for (CrunchControlledJob nextJob : oldJobs.values()) {
+      // Submitting Job to Hadoop
+      nextJob.submit();
+      this.addToQueue(nextJob);
+    }
+  }
+
+  synchronized public boolean allFinished() {
+    return this.waitingJobs.size() == 0 && this.readyJobs.size() == 0
+        && this.runningJobs.size() == 0;
+  }
+
+  /**
+   * The main loop for the thread. The loop does the following: Check the states
+   * of the running jobs Update the states of waiting jobs Submit the jobs in
+   * ready state
+   */
+  public void run() {
+    this.runnerState = ThreadState.RUNNING;
+    while (true) {
+      while (this.runnerState == ThreadState.SUSPENDED) {
+        try {
+          Thread.sleep(5000);
+        } catch (Exception e) {
+
+        }
+      }
+      try {
+        checkRunningJobs();
+        checkWaitingJobs();
+        startReadyJobs();
+      } catch (Exception e) {
+        this.runnerState = ThreadState.STOPPED;
+      }
+      if (this.runnerState != ThreadState.RUNNING
+          && this.runnerState != ThreadState.SUSPENDED) {
+        break;
+      }
+      try {
+        Thread.sleep(5000);
+      } catch (Exception e) {
+
+      }
+      if (this.runnerState != ThreadState.RUNNING
+          && this.runnerState != ThreadState.SUSPENDED) {
+        break;
+      }
+    }
+    this.runnerState = ThreadState.STOPPED;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/4097823b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java
new file mode 100644
index 0000000..5d0863d
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java
@@ -0,0 +1,474 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import org.apache.crunch.impl.mr.run.TaskAttemptContextFactory;
+
+/**
+ * The MultipleOutputs class simplifies writing output data 
+ * to multiple outputs
+ * 
+ * <p> 
+ * Case one: writing to additional outputs other than the job default output.
+ *
+ * Each additional output, or named output, may be configured with its own
+ * <code>OutputFormat</code>, with its own key class and with its own value
+ * class.
+ * 
+ * <p>
+ * Case two: to write data to different files provided by user
+ * </p>
+ * 
+ * <p>
+ * MultipleOutputs supports counters, by default they are disabled. The 
+ * counters group is the {@link CrunchMultipleOutputs} class name. The names of the 
+ * counters are the same as the output name. These count the number records 
+ * written to each output name.
+ * </p>
+ * 
+ * Usage pattern for job submission:
+ * <pre>
+ *
+ * Job job = new Job();
+ *
+ * FileInputFormat.setInputPath(job, inDir);
+ * FileOutputFormat.setOutputPath(job, outDir);
+ *
+ * job.setMapperClass(MOMap.class);
+ * job.setReducerClass(MOReduce.class);
+ * ...
+ *
+ * // Defines additional single text based output 'text' for the job
+ * MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
+ * LongWritable.class, Text.class);
+ *
+ * // Defines additional sequence-file based output 'sequence' for the job
+ * MultipleOutputs.addNamedOutput(job, "seq",
+ *   SequenceFileOutputFormat.class,
+ *   LongWritable.class, Text.class);
+ * ...
+ *
+ * job.waitForCompletion(true);
+ * ...
+ * </pre>
+ * <p>
+ * Usage in Reducer:
+ * <pre>
+ * <K, V> String generateFileName(K k, V v) {
+ *   return k.toString() + "_" + v.toString();
+ * }
+ * 
+ * public class MOReduce extends
+ *   Reducer&lt;WritableComparable, Writable,WritableComparable, Writable&gt; {
+ * private MultipleOutputs mos;
+ * public void setup(Context context) {
+ * ...
+ * mos = new MultipleOutputs(context);
+ * }
+ *
+ * public void reduce(WritableComparable key, Iterator&lt;Writable&gt; values,
+ * Context context)
+ * throws IOException {
+ * ...
+ * mos.write("text", , key, new Text("Hello"));
+ * mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a");
+ * mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b");
+ * mos.write(key, new Text("value"), generateFileName(key, new Text("value")));
+ * ...
+ * }
+ *
+ * public void cleanup(Context) throws IOException {
+ * mos.close();
+ * ...
+ * }
+ *
+ * }
+ * </pre>
+ */
+public class CrunchMultipleOutputs<KEYOUT, VALUEOUT> {
+
+  private static final String MULTIPLE_OUTPUTS = "mapreduce.multipleoutputs";
+
+  private static final String MO_PREFIX = 
+    "mapreduce.multipleoutputs.namedOutput.";
+
+  private static final String PART = "part";
+  private static final String FORMAT = ".format";
+  private static final String KEY = ".key";
+  private static final String VALUE = ".value";
+  private static final String COUNTERS_ENABLED = 
+    "mapreduce.multipleoutputs.counters";
+
+  private static final String BASE_OUTPUT_NAME = "mapreduce.output.basename";
+  
+  /**
+   * Counters group used by the counters of MultipleOutputs.
+   */
+  private static final String COUNTERS_GROUP = CrunchMultipleOutputs.class.getName();
+  
+  /**
+   * Cache for the taskContexts
+   */
+  private Map<String, TaskAttemptContext> taskContexts = new HashMap<String, TaskAttemptContext>();
+
+  /**
+   * Checks if a named output name is valid token.
+   *
+   * @param namedOutput named output Name
+   * @throws IllegalArgumentException if the output name is not valid.
+   */
+  private static void checkTokenName(String namedOutput) {
+    if (namedOutput == null || namedOutput.length() == 0) {
+      throw new IllegalArgumentException(
+        "Name cannot be NULL or emtpy");
+    }
+    for (char ch : namedOutput.toCharArray()) {
+      if ((ch >= 'A') && (ch <= 'Z')) {
+        continue;
+      }
+      if ((ch >= 'a') && (ch <= 'z')) {
+        continue;
+      }
+      if ((ch >= '0') && (ch <= '9')) {
+        continue;
+      }
+      throw new IllegalArgumentException(
+        "Name cannot be have a '" + ch + "' char");
+    }
+  }
+
+  /**
+   * Checks if output name is valid.
+   *
+   * name cannot be the name used for the default output
+   * @param outputPath base output Name
+   * @throws IllegalArgumentException if the output name is not valid.
+   */
+  private static void checkBaseOutputPath(String outputPath) {
+    if (outputPath.equals(PART)) {
+      throw new IllegalArgumentException("output name cannot be 'part'");
+    }
+  }
+  
+  /**
+   * Checks if a named output name is valid.
+   *
+   * @param namedOutput named output Name
+   * @throws IllegalArgumentException if the output name is not valid.
+   */
+  private static void checkNamedOutputName(JobContext job,
+      String namedOutput, boolean alreadyDefined) {
+    checkTokenName(namedOutput);
+    checkBaseOutputPath(namedOutput);
+    List<String> definedChannels = getNamedOutputsList(job);
+    if (alreadyDefined && definedChannels.contains(namedOutput)) {
+      throw new IllegalArgumentException("Named output '" + namedOutput +
+        "' already alreadyDefined");
+    } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) {
+      throw new IllegalArgumentException("Named output '" + namedOutput +
+        "' not defined");
+    }
+  }
+
+  // Returns list of channel names.
+  private static List<String> getNamedOutputsList(JobContext job) {
+    List<String> names = new ArrayList<String>();
+    StringTokenizer st = new StringTokenizer(
+      job.getConfiguration().get(MULTIPLE_OUTPUTS, ""), " ");
+    while (st.hasMoreTokens()) {
+      names.add(st.nextToken());
+    }
+    return names;
+  }
+
+  // Returns the named output OutputFormat.
+  @SuppressWarnings("unchecked")
+  private static Class<? extends OutputFormat<?, ?>> getNamedOutputFormatClass(
+    JobContext job, String namedOutput) {
+    return (Class<? extends OutputFormat<?, ?>>)
+      job.getConfiguration().getClass(MO_PREFIX + namedOutput + FORMAT, null,
+      OutputFormat.class);
+  }
+
+  // Returns the key class for a named output.
+  private static Class<?> getNamedOutputKeyClass(JobContext job,
+                                                String namedOutput) {
+    return job.getConfiguration().getClass(MO_PREFIX + namedOutput + KEY, null,
+      Object.class);
+  }
+
+  // Returns the value class for a named output.
+  private static Class<?> getNamedOutputValueClass(
+      JobContext job, String namedOutput) {
+    return job.getConfiguration().getClass(MO_PREFIX + namedOutput + VALUE,
+      null, Object.class);
+  }
+
+  /**
+   * Adds a named output for the job.
+   * <p/>
+   *
+   * @param job               job to add the named output
+   * @param namedOutput       named output name, it has to be a word, letters
+   *                          and numbers only, cannot be the word 'part' as
+   *                          that is reserved for the default output.
+   * @param outputFormatClass OutputFormat class.
+   * @param keyClass          key class
+   * @param valueClass        value class
+   */
+  public static void addNamedOutput(Job job, String namedOutput,
+      Class<? extends OutputFormat> outputFormatClass,
+      Class<?> keyClass, Class<?> valueClass) {
+    checkNamedOutputName(job, namedOutput, true);
+    Configuration conf = job.getConfiguration();
+    conf.set(MULTIPLE_OUTPUTS,
+      conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput);
+    conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
+      OutputFormat.class);
+    conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class);
+    conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class);
+  }
+
+  /**
+   * Enables or disables counters for the named outputs.
+   * 
+   * The counters group is the {@link CrunchMultipleOutputs} class name.
+   * The names of the counters are the same as the named outputs. These
+   * counters count the number records written to each output name.
+   * By default these counters are disabled.
+   *
+   * @param job    job  to enable counters
+   * @param enabled indicates if the counters will be enabled or not.
+   */
+  public static void setCountersEnabled(Job job, boolean enabled) {
+    job.getConfiguration().setBoolean(COUNTERS_ENABLED, enabled);
+  }
+
+  /**
+   * Returns if the counters for the named outputs are enabled or not.
+   * By default these counters are disabled.
+   *
+   * @param job    the job 
+   * @return TRUE if the counters are enabled, FALSE if they are disabled.
+   */
+  public static boolean getCountersEnabled(JobContext job) {
+    return job.getConfiguration().getBoolean(COUNTERS_ENABLED, false);
+  }
+
+  /**
+   * Wraps RecordWriter to increment counters. 
+   */
+  @SuppressWarnings("unchecked")
+  private static class RecordWriterWithCounter extends RecordWriter {
+    private RecordWriter writer;
+    private String counterName;
+    private TaskInputOutputContext context;
+
+    public RecordWriterWithCounter(RecordWriter writer, String counterName,
+                                   TaskInputOutputContext context) {
+      this.writer = writer;
+      this.counterName = counterName;
+      this.context = context;
+    }
+
+    @SuppressWarnings({"unchecked"})
+    public void write(Object key, Object value) 
+        throws IOException, InterruptedException {
+      context.getCounter(COUNTERS_GROUP, counterName).increment(1);
+      writer.write(key, value);
+    }
+
+    public void close(TaskAttemptContext context) 
+        throws IOException, InterruptedException {
+      writer.close(context);
+    }
+  }
+
+  // instance code, to be used from Mapper/Reducer code
+
+  private TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context;
+  private Set<String> namedOutputs;
+  private Map<String, RecordWriter<?, ?>> recordWriters;
+  private boolean countersEnabled;
+  
+  /**
+   * Creates and initializes multiple outputs support,
+   * it should be instantiated in the Mapper/Reducer setup method.
+   *
+   * @param context the TaskInputOutputContext object
+   */
+  public CrunchMultipleOutputs(
+      TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) {
+    this.context = context;
+    namedOutputs = Collections.unmodifiableSet(
+      new HashSet<String>(CrunchMultipleOutputs.getNamedOutputsList(context)));
+    recordWriters = new HashMap<String, RecordWriter<?, ?>>();
+    countersEnabled = getCountersEnabled(context);
+  }
+
+  /**
+   * Write key and value to the namedOutput.
+   *
+   * Output path is a unique file generated for the namedOutput.
+   * For example, {namedOutput}-(m|r)-{part-number}
+   * 
+   * @param namedOutput the named output name
+   * @param key         the key
+   * @param value       the value
+   */
+  @SuppressWarnings("unchecked")
+  public <K, V> void write(String namedOutput, K key, V value)
+      throws IOException, InterruptedException {
+    write(namedOutput, key, value, namedOutput);
+  }
+
+  /**
+   * Write key and value to baseOutputPath using the namedOutput.
+   * 
+   * @param namedOutput    the named output name
+   * @param key            the key
+   * @param value          the value
+   * @param baseOutputPath base-output path to write the record to.
+   * Note: Framework will generate unique filename for the baseOutputPath
+   */
+  @SuppressWarnings("unchecked")
+  public <K, V> void write(String namedOutput, K key, V value,
+      String baseOutputPath) throws IOException, InterruptedException {
+    checkNamedOutputName(context, namedOutput, false);
+    checkBaseOutputPath(baseOutputPath);
+    if (!namedOutputs.contains(namedOutput)) {
+      throw new IllegalArgumentException("Undefined named output '" +
+        namedOutput + "'");
+    }
+    TaskAttemptContext taskContext = getContext(namedOutput);
+    getRecordWriter(taskContext, baseOutputPath).write(key, value);
+  }
+
+  /**
+   * Write key value to an output file name.
+   * 
+   * Gets the record writer from job's output format.  
+   * Job's output format should be a FileOutputFormat.
+   * 
+   * @param key       the key
+   * @param value     the value
+   * @param baseOutputPath base-output path to write the record to.
+   * Note: Framework will generate unique filename for the baseOutputPath
+   */
+  @SuppressWarnings("unchecked")
+  public void write(KEYOUT key, VALUEOUT value, String baseOutputPath) 
+      throws IOException, InterruptedException {
+    checkBaseOutputPath(baseOutputPath);
+    TaskAttemptContext taskContext = TaskAttemptContextFactory.create(
+      context.getConfiguration(), context.getTaskAttemptID());
+    getRecordWriter(taskContext, baseOutputPath).write(key, value);
+  }
+
+  // by being synchronized MultipleOutputTask can be use with a
+  // MultithreadedMapper.
+  @SuppressWarnings("unchecked")
+  private synchronized RecordWriter getRecordWriter(
+      TaskAttemptContext taskContext, String baseFileName) 
+      throws IOException, InterruptedException {
+    
+    // look for record-writer in the cache
+    RecordWriter writer = recordWriters.get(baseFileName);
+    
+    // If not in cache, create a new one
+    if (writer == null) {
+      // get the record writer from context output format
+      taskContext.getConfiguration().set(BASE_OUTPUT_NAME, baseFileName);
+      try {
+        writer = ((OutputFormat) ReflectionUtils.newInstance(
+          taskContext.getOutputFormatClass(), taskContext.getConfiguration()))
+          .getRecordWriter(taskContext);
+      } catch (ClassNotFoundException e) {
+        throw new IOException(e);
+      }
+ 
+      // if counters are enabled, wrap the writer with context 
+      // to increment counters 
+      if (countersEnabled) {
+        writer = new RecordWriterWithCounter(writer, baseFileName, context);
+      }
+      
+      // add the record-writer to the cache
+      recordWriters.put(baseFileName, writer);
+    }
+    return writer;
+  }
+
+   // Create a taskAttemptContext for the named output with 
+   // output format and output key/value types put in the context
+  private TaskAttemptContext getContext(String nameOutput) throws IOException {
+
+    TaskAttemptContext taskContext = taskContexts.get(nameOutput);
+
+    if (taskContext != null) {
+      return taskContext;
+    }
+
+    // The following trick leverages the instantiation of a record writer via
+    // the job thus supporting arbitrary output formats.
+    Job job = new Job(context.getConfiguration());
+    job.getConfiguration().set("crunch.namedoutput", nameOutput);
+    job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput));
+    job.setOutputKeyClass(getNamedOutputKeyClass(context, nameOutput));
+    job.setOutputValueClass(getNamedOutputValueClass(context, nameOutput));
+    taskContext = TaskAttemptContextFactory.create(
+      job.getConfiguration(), context.getTaskAttemptID());
+    
+    taskContexts.put(nameOutput, taskContext);
+    
+    return taskContext;
+  }
+  
+  /**
+   * Closes all the opened outputs.
+   * 
+   * This should be called from cleanup method of map/reduce task.
+   * If overridden subclasses must invoke <code>super.close()</code> at the
+   * end of their <code>close()</code>
+   * 
+   */
+  @SuppressWarnings("unchecked")
+  public void close() throws IOException, InterruptedException {
+    for (RecordWriter writer : recordWriters.values()) {
+      writer.close(context);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/4097823b/crunch/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java b/crunch/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java
index 9e3a224..e068e3c 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
 import org.apache.crunch.types.Converter;
-import org.apache.hadoop.mapreduce.lib.output.CrunchMultipleOutputs;
+import org.apache.crunch.hadoop.mapreduce.lib.output.CrunchMultipleOutputs;
 
 public class MultipleOutputEmitter<T, K, V> implements Emitter<T> {
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/4097823b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java
index fbfcf05..8327f58 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
+import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.collect.Lists;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/4097823b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
index c8c47b6..9811600 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
@@ -22,8 +22,8 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.PipelineResult;
-import org.apache.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
-import org.apache.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl;
+import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
+import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl;
 
 import com.google.common.collect.Lists;
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/4097823b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
index 8093cd5..5d501a2 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
@@ -25,7 +25,7 @@ import org.apache.crunch.util.DistCache;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.apache.hadoop.mapreduce.lib.output.CrunchMultipleOutputs;
+import org.apache.crunch.hadoop.mapreduce.lib.output.CrunchMultipleOutputs;
 
 public class CrunchTaskContext {
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/4097823b/crunch/src/main/java/org/apache/crunch/io/PathTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/PathTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/PathTargetImpl.java
index 14d5a66..beb9ce8 100644
--- a/crunch/src/main/java/org/apache/crunch/io/PathTargetImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/io/PathTargetImpl.java
@@ -18,10 +18,10 @@
 package org.apache.crunch.io;
 
 import org.apache.crunch.types.PType;
+import org.apache.crunch.hadoop.mapreduce.lib.output.CrunchMultipleOutputs;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.CrunchMultipleOutputs;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 public abstract class PathTargetImpl implements PathTarget {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/4097823b/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
index 196e38c..22df7f8 100644
--- a/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
@@ -19,13 +19,13 @@ package org.apache.crunch.io.impl;
 
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.crunch.SourceTarget;
+import org.apache.crunch.hadoop.mapreduce.lib.output.CrunchMultipleOutputs;
 import org.apache.crunch.io.OutputHandler;
 import org.apache.crunch.io.PathTarget;
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.CrunchMultipleOutputs;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 public class FileTargetImpl implements PathTarget {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/4097823b/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java b/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
deleted file mode 100644
index a452d2e..0000000
--- a/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapreduce.lib.jobcontrol;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.util.StringUtils;
-
-/**
- * This class encapsulates a MapReduce job and its dependency. It monitors the
- * states of the depending jobs and updates the state of this job. A job starts
- * in the WAITING state. If it does not have any depending jobs, or all of the
- * depending jobs are in SUCCESS state, then the job state will become READY. If
- * any depending jobs fail, the job will fail too. When in READY state, the job
- * can be submitted to Hadoop for execution, with the state changing into
- * RUNNING state. From RUNNING state, the job can get into SUCCESS or FAILED
- * state, depending the status of the job execution.
- */
-public class CrunchControlledJob {
-
-  // A job will be in one of the following states
-  public static enum State {
-    SUCCESS,
-    WAITING,
-    RUNNING,
-    READY,
-    FAILED,
-    DEPENDENT_FAILED
-  };
-
-  public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist";
-  protected State state;
-  protected Job job; // mapreduce job to be executed.
-  // some info for human consumption, e.g. the reason why the job failed
-  protected String message;
-  private String controlID; // assigned and used by JobControl class
-  // the jobs the current job depends on
-  private List<CrunchControlledJob> dependingJobs;
-
-  /**
-   * Construct a job.
-   * 
-   * @param job
-   *          a mapreduce job to be executed.
-   * @param dependingJobs
-   *          an array of jobs the current job depends on
-   */
-  public CrunchControlledJob(Job job, List<CrunchControlledJob> dependingJobs) throws IOException {
-    this.job = job;
-    this.dependingJobs = dependingJobs;
-    this.state = State.WAITING;
-    this.controlID = "unassigned";
-    this.message = "just initialized";
-  }
-
-  /**
-   * Construct a job.
-   * 
-   * @param conf
-   *          mapred job configuration representing a job to be executed.
-   * @throws IOException
-   */
-  public CrunchControlledJob(Configuration conf) throws IOException {
-    this(new Job(conf), null);
-  }
-
-  @Override
-  public String toString() {
-    StringBuffer sb = new StringBuffer();
-    sb.append("job name:\t").append(this.job.getJobName()).append("\n");
-    sb.append("job id:\t").append(this.controlID).append("\n");
-    sb.append("job state:\t").append(this.state).append("\n");
-    sb.append("job mapred id:\t").append(this.job.getJobID()).append("\n");
-    sb.append("job message:\t").append(this.message).append("\n");
-
-    if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
-      sb.append("job has no depending job:\t").append("\n");
-    } else {
-      sb.append("job has ").append(this.dependingJobs.size()).append(" dependeng jobs:\n");
-      for (int i = 0; i < this.dependingJobs.size(); i++) {
-        sb.append("\t depending job ").append(i).append(":\t");
-        sb.append((this.dependingJobs.get(i)).getJobName()).append("\n");
-      }
-    }
-    return sb.toString();
-  }
-
-  /**
-   * @return the job name of this job
-   */
-  public String getJobName() {
-    return job.getJobName();
-  }
-
-  /**
-   * Set the job name for this job.
-   * 
-   * @param jobName
-   *          the job name
-   */
-  public void setJobName(String jobName) {
-    job.setJobName(jobName);
-  }
-
-  /**
-   * @return the job ID of this job assigned by JobControl
-   */
-  public String getJobID() {
-    return this.controlID;
-  }
-
-  /**
-   * Set the job ID for this job.
-   * 
-   * @param id
-   *          the job ID
-   */
-  public void setJobID(String id) {
-    this.controlID = id;
-  }
-
-  /**
-   * @return the mapred ID of this job as assigned by the mapred framework.
-   */
-  public JobID getMapredJobID() {
-    return this.job.getJobID();
-  }
-
-  /**
-   * @return the mapreduce job
-   */
-  public synchronized Job getJob() {
-    return this.job;
-  }
-
-  /**
-   * Set the mapreduce job
-   * 
-   * @param job
-   *          the mapreduce job for this job.
-   */
-  public synchronized void setJob(Job job) {
-    this.job = job;
-  }
-
-  /**
-   * @return the state of this job
-   */
-  public synchronized State getJobState() {
-    return this.state;
-  }
-
-  /**
-   * Set the state for this job.
-   * 
-   * @param state
-   *          the new state for this job.
-   */
-  protected synchronized void setJobState(State state) {
-    this.state = state;
-  }
-
-  /**
-   * @return the message of this job
-   */
-  public synchronized String getMessage() {
-    return this.message;
-  }
-
-  /**
-   * Set the message for this job.
-   * 
-   * @param message
-   *          the message for this job.
-   */
-  public synchronized void setMessage(String message) {
-    this.message = message;
-  }
-
-  /**
-   * @return the depending jobs of this job
-   */
-  public List<CrunchControlledJob> getDependentJobs() {
-    return this.dependingJobs;
-  }
-
-  /**
-   * Add a job to this jobs' dependency list. Dependent jobs can only be added
-   * while a Job is waiting to run, not during or afterwards.
-   * 
-   * @param dependingJob
-   *          Job that this Job depends on.
-   * @return <tt>true</tt> if the Job was added.
-   */
-  public synchronized boolean addDependingJob(CrunchControlledJob dependingJob) {
-    if (this.state == State.WAITING) { // only allowed to add jobs when waiting
-      if (this.dependingJobs == null) {
-        this.dependingJobs = new ArrayList<CrunchControlledJob>();
-      }
-      return this.dependingJobs.add(dependingJob);
-    } else {
-      return false;
-    }
-  }
-
-  /**
-   * @return true if this job is in a complete state
-   */
-  public synchronized boolean isCompleted() {
-    return this.state == State.FAILED || this.state == State.DEPENDENT_FAILED || this.state == State.SUCCESS;
-  }
-
-  /**
-   * @return true if this job is in READY state
-   */
-  public synchronized boolean isReady() {
-    return this.state == State.READY;
-  }
-
-  public void killJob() throws IOException, InterruptedException {
-    job.killJob();
-  }
-
-  /**
-   * Check the state of this running job. The state may remain the same, become
-   * SUCCESS or FAILED.
-   */
-  protected void checkRunningState() throws IOException, InterruptedException {
-    try {
-      if (job.isComplete()) {
-        if (job.isSuccessful()) {
-          this.state = State.SUCCESS;
-        } else {
-          this.state = State.FAILED;
-          this.message = "Job failed!";
-        }
-      }
-    } catch (IOException ioe) {
-      this.state = State.FAILED;
-      this.message = StringUtils.stringifyException(ioe);
-      try {
-        if (job != null) {
-          job.killJob();
-        }
-      } catch (IOException e) {
-      }
-    }
-  }
-
-  /**
-   * Check and update the state of this job. The state changes depending on its
-   * current state and the states of the depending jobs.
-   */
-  synchronized State checkState() throws IOException, InterruptedException {
-    if (this.state == State.RUNNING) {
-      checkRunningState();
-    }
-    if (this.state != State.WAITING) {
-      return this.state;
-    }
-    if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
-      this.state = State.READY;
-      return this.state;
-    }
-    CrunchControlledJob pred = null;
-    int n = this.dependingJobs.size();
-    for (int i = 0; i < n; i++) {
-      pred = this.dependingJobs.get(i);
-      State s = pred.checkState();
-      if (s == State.WAITING || s == State.READY || s == State.RUNNING) {
-        break; // a pred is still not completed, continue in WAITING
-        // state
-      }
-      if (s == State.FAILED || s == State.DEPENDENT_FAILED) {
-        this.state = State.DEPENDENT_FAILED;
-        this.message = "depending job " + i + " with jobID " + pred.getJobID() + " failed. " + pred.getMessage();
-        break;
-      }
-      // pred must be in success state
-      if (i == n - 1) {
-        this.state = State.READY;
-      }
-    }
-
-    return this.state;
-  }
-
-  /**
-   * Submit this job to mapred. The state becomes RUNNING if submission is
-   * successful, FAILED otherwise.
-   */
-  protected synchronized void submit() {
-    try {
-      Configuration conf = job.getConfiguration();
-      if (conf.getBoolean(CREATE_DIR, false)) {
-        FileSystem fs = FileSystem.get(conf);
-        Path inputPaths[] = FileInputFormat.getInputPaths(job);
-        for (int i = 0; i < inputPaths.length; i++) {
-          if (!fs.exists(inputPaths[i])) {
-            try {
-              fs.mkdirs(inputPaths[i]);
-            } catch (IOException e) {
-
-            }
-          }
-        }
-      }
-      job.submit();
-      this.state = State.RUNNING;
-    } catch (Exception ioe) {
-      this.state = State.FAILED;
-      this.message = StringUtils.stringifyException(ioe);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/4097823b/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java b/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
deleted file mode 100644
index 99bb324..0000000
--- a/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
+++ /dev/null
@@ -1,295 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapreduce.lib.jobcontrol;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Hashtable;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.State;
-
-/**
- * This class encapsulates a set of MapReduce jobs and its dependency.
- * 
- * It tracks the states of the jobs by placing them into different tables
- * according to their states.
- * 
- * This class provides APIs for the client app to add a job to the group and to
- * get the jobs in the group in different states. When a job is added, an ID
- * unique to the group is assigned to the job.
- * 
- * This class has a thread that submits jobs when they become ready, monitors
- * the states of the running jobs, and updates the states of jobs based on the
- * state changes of their depending jobs states. The class provides APIs for
- * suspending/resuming the thread, and for stopping the thread.
- * 
- * TODO This is mostly a copy of the JobControl class in Hadoop MapReduce core.
- * Once the location and interface of the class are more stable in CDH, this
- * class should be removed completely and be based on the hadoop-core class.
- */
-public class CrunchJobControl implements Runnable {
-
-  // The thread can be in one of the following state
-  public static enum ThreadState {
-    RUNNING,
-    SUSPENDED,
-    STOPPED,
-    STOPPING,
-    READY
-  };
-
-  private ThreadState runnerState; // the thread state
-
-  private Map<String, CrunchControlledJob> waitingJobs;
-  private Map<String, CrunchControlledJob> readyJobs;
-  private Map<String, CrunchControlledJob> runningJobs;
-  private Map<String, CrunchControlledJob> successfulJobs;
-  private Map<String, CrunchControlledJob> failedJobs;
-
-  private long nextJobID;
-  private String groupName;
-
-  /**
-   * Construct a job control for a group of jobs.
-   * 
-   * @param groupName
-   *          a name identifying this group
-   */
-  public CrunchJobControl(String groupName) {
-    this.waitingJobs = new Hashtable<String, CrunchControlledJob>();
-    this.readyJobs = new Hashtable<String, CrunchControlledJob>();
-    this.runningJobs = new Hashtable<String, CrunchControlledJob>();
-    this.successfulJobs = new Hashtable<String, CrunchControlledJob>();
-    this.failedJobs = new Hashtable<String, CrunchControlledJob>();
-    this.nextJobID = -1;
-    this.groupName = groupName;
-    this.runnerState = ThreadState.READY;
-  }
-
-  private static List<CrunchControlledJob> toList(Map<String, CrunchControlledJob> jobs) {
-    ArrayList<CrunchControlledJob> retv = new ArrayList<CrunchControlledJob>();
-    synchronized (jobs) {
-      for (CrunchControlledJob job : jobs.values()) {
-        retv.add(job);
-      }
-    }
-    return retv;
-  }
-
-  /**
-   * @return the jobs in the waiting state
-   */
-  public List<CrunchControlledJob> getWaitingJobList() {
-    return toList(this.waitingJobs);
-  }
-
-  /**
-   * @return the jobs in the running state
-   */
-  public List<CrunchControlledJob> getRunningJobList() {
-    return toList(this.runningJobs);
-  }
-
-  /**
-   * @return the jobs in the ready state
-   */
-  public List<CrunchControlledJob> getReadyJobsList() {
-    return toList(this.readyJobs);
-  }
-
-  /**
-   * @return the jobs in the success state
-   */
-  public List<CrunchControlledJob> getSuccessfulJobList() {
-    return toList(this.successfulJobs);
-  }
-
-  public List<CrunchControlledJob> getFailedJobList() {
-    return toList(this.failedJobs);
-  }
-
-  private String getNextJobID() {
-    nextJobID += 1;
-    return this.groupName + this.nextJobID;
-  }
-
-  private static void addToQueue(CrunchControlledJob aJob, Map<String, CrunchControlledJob> queue) {
-    synchronized (queue) {
-      queue.put(aJob.getJobID(), aJob);
-    }
-  }
-
-  private void addToQueue(CrunchControlledJob aJob) {
-    Map<String, CrunchControlledJob> queue = getQueue(aJob.getJobState());
-    addToQueue(aJob, queue);
-  }
-
-  private Map<String, CrunchControlledJob> getQueue(State state) {
-    Map<String, CrunchControlledJob> retv = null;
-    if (state == State.WAITING) {
-      retv = this.waitingJobs;
-    } else if (state == State.READY) {
-      retv = this.readyJobs;
-    } else if (state == State.RUNNING) {
-      retv = this.runningJobs;
-    } else if (state == State.SUCCESS) {
-      retv = this.successfulJobs;
-    } else if (state == State.FAILED || state == State.DEPENDENT_FAILED) {
-      retv = this.failedJobs;
-    }
-    return retv;
-  }
-
-  /**
-   * Add a new job.
-   * 
-   * @param aJob
-   *          the new job
-   */
-  synchronized public String addJob(CrunchControlledJob aJob) {
-    String id = this.getNextJobID();
-    aJob.setJobID(id);
-    aJob.setJobState(State.WAITING);
-    this.addToQueue(aJob);
-    return id;
-  }
-
-  /**
-   * Add a collection of jobs
-   * 
-   * @param jobs
-   */
-  public void addJobCollection(Collection<CrunchControlledJob> jobs) {
-    for (CrunchControlledJob job : jobs) {
-      addJob(job);
-    }
-  }
-
-  /**
-   * @return the thread state
-   */
-  public ThreadState getThreadState() {
-    return this.runnerState;
-  }
-
-  /**
-   * set the thread state to STOPPING so that the thread will stop when it wakes
-   * up.
-   */
-  public void stop() {
-    this.runnerState = ThreadState.STOPPING;
-  }
-
-  /**
-   * suspend the running thread
-   */
-  public void suspend() {
-    if (this.runnerState == ThreadState.RUNNING) {
-      this.runnerState = ThreadState.SUSPENDED;
-    }
-  }
-
-  /**
-   * resume the suspended thread
-   */
-  public void resume() {
-    if (this.runnerState == ThreadState.SUSPENDED) {
-      this.runnerState = ThreadState.RUNNING;
-    }
-  }
-
-  synchronized private void checkRunningJobs() throws IOException, InterruptedException {
-
-    Map<String, CrunchControlledJob> oldJobs = null;
-    oldJobs = this.runningJobs;
-    this.runningJobs = new Hashtable<String, CrunchControlledJob>();
-
-    for (CrunchControlledJob nextJob : oldJobs.values()) {
-      nextJob.checkState();
-      this.addToQueue(nextJob);
-    }
-  }
-
-  synchronized private void checkWaitingJobs() throws IOException, InterruptedException {
-    Map<String, CrunchControlledJob> oldJobs = null;
-    oldJobs = this.waitingJobs;
-    this.waitingJobs = new Hashtable<String, CrunchControlledJob>();
-
-    for (CrunchControlledJob nextJob : oldJobs.values()) {
-      nextJob.checkState();
-      this.addToQueue(nextJob);
-    }
-  }
-
-  synchronized private void startReadyJobs() {
-    Map<String, CrunchControlledJob> oldJobs = null;
-    oldJobs = this.readyJobs;
-    this.readyJobs = new Hashtable<String, CrunchControlledJob>();
-
-    for (CrunchControlledJob nextJob : oldJobs.values()) {
-      // Submitting Job to Hadoop
-      nextJob.submit();
-      this.addToQueue(nextJob);
-    }
-  }
-
-  synchronized public boolean allFinished() {
-    return this.waitingJobs.size() == 0 && this.readyJobs.size() == 0 && this.runningJobs.size() == 0;
-  }
-
-  /**
-   * The main loop for the thread. The loop does the following: Check the states
-   * of the running jobs Update the states of waiting jobs Submit the jobs in
-   * ready state
-   */
-  public void run() {
-    this.runnerState = ThreadState.RUNNING;
-    while (true) {
-      while (this.runnerState == ThreadState.SUSPENDED) {
-        try {
-          Thread.sleep(5000);
-        } catch (Exception e) {
-
-        }
-      }
-      try {
-        checkRunningJobs();
-        checkWaitingJobs();
-        startReadyJobs();
-      } catch (Exception e) {
-        this.runnerState = ThreadState.STOPPED;
-      }
-      if (this.runnerState != ThreadState.RUNNING && this.runnerState != ThreadState.SUSPENDED) {
-        break;
-      }
-      try {
-        Thread.sleep(5000);
-      } catch (Exception e) {
-
-      }
-      if (this.runnerState != ThreadState.RUNNING && this.runnerState != ThreadState.SUSPENDED) {
-        break;
-      }
-    }
-    this.runnerState = ThreadState.STOPPED;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/4097823b/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java b/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java
deleted file mode 100644
index 39b95b5..0000000
--- a/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java
+++ /dev/null
@@ -1,471 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapreduce.lib.output;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.StringTokenizer;
-
-import org.apache.crunch.impl.mr.run.TaskAttemptContextFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.apache.hadoop.util.ReflectionUtils;
-
-/**
- * The MultipleOutputs class simplifies writing output data to multiple outputs
- * 
- * <p>
- * Case one: writing to additional outputs other than the job default output.
- * 
- * Each additional output, or named output, may be configured with its own
- * <code>OutputFormat</code>, with its own key class and with its own value
- * class.
- * 
- * <p>
- * Case two: to write data to different files provided by user
- * </p>
- * 
- * <p>
- * MultipleOutputs supports counters, by default they are disabled. The counters
- * group is the {@link CrunchMultipleOutputs} class name. The names of the
- * counters are the same as the output name. These count the number records
- * written to each output name.
- * </p>
- * 
- * Usage pattern for job submission:
- * 
- * <pre>
- * 
- * Job job = new Job();
- * 
- * FileInputFormat.setInputPath(job, inDir);
- * FileOutputFormat.setOutputPath(job, outDir);
- * 
- * job.setMapperClass(MOMap.class);
- * job.setReducerClass(MOReduce.class);
- * ...
- * 
- * // Defines additional single text based output 'text' for the job
- * MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
- * LongWritable.class, Text.class);
- * 
- * // Defines additional sequence-file based output 'sequence' for the job
- * MultipleOutputs.addNamedOutput(job, "seq",
- *   SequenceFileOutputFormat.class,
- *   LongWritable.class, Text.class);
- * ...
- * 
- * job.waitForCompletion(true);
- * ...
- * </pre>
- * <p>
- * Usage in Reducer:
- * 
- * <pre>
- * <K, V> String generateFileName(K k, V v) {
- *   return k.toString() + "_" + v.toString();
- * }
- * 
- * public class MOReduce extends
- *   Reducer&lt;WritableComparable, Writable,WritableComparable, Writable&gt; {
- * private MultipleOutputs mos;
- * public void setup(Context context) {
- * ...
- * mos = new MultipleOutputs(context);
- * }
- * 
- * public void reduce(WritableComparable key, Iterator&lt;Writable&gt; values,
- * Context context)
- * throws IOException {
- * ...
- * mos.write("text", , key, new Text("Hello"));
- * mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a");
- * mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b");
- * mos.write(key, new Text("value"), generateFileName(key, new Text("value")));
- * ...
- * }
- * 
- * public void cleanup(Context) throws IOException {
- * mos.close();
- * ...
- * }
- * 
- * }
- * </pre>
- */
-public class CrunchMultipleOutputs<KEYOUT, VALUEOUT> {
-
-  private static final String MULTIPLE_OUTPUTS = "mapreduce.multipleoutputs";
-
-  private static final String MO_PREFIX = "mapreduce.multipleoutputs.namedOutput.";
-
-  private static final String FORMAT = ".format";
-  private static final String KEY = ".key";
-  private static final String VALUE = ".value";
-  private static final String COUNTERS_ENABLED = "mapreduce.multipleoutputs.counters";
-
-  private static final String BASE_OUTPUT_NAME = "mapreduce.output.basename";
-
-  /**
-   * Counters group used by the counters of MultipleOutputs.
-   */
-  private static final String COUNTERS_GROUP = CrunchMultipleOutputs.class.getName();
-
-  /**
-   * Cache for the taskContexts
-   */
-  private Map<String, TaskAttemptContext> taskContexts = new HashMap<String, TaskAttemptContext>();
-
-  /**
-   * Checks if a named output name is valid token.
-   * 
-   * @param namedOutput
-   *          named output Name
-   * @throws IllegalArgumentException
-   *           if the output name is not valid.
-   */
-  private static void checkTokenName(String namedOutput) {
-    if (namedOutput == null || namedOutput.length() == 0) {
-      throw new IllegalArgumentException("Name cannot be NULL or emtpy");
-    }
-    for (char ch : namedOutput.toCharArray()) {
-      if ((ch >= 'A') && (ch <= 'Z')) {
-        continue;
-      }
-      if ((ch >= 'a') && (ch <= 'z')) {
-        continue;
-      }
-      if ((ch >= '0') && (ch <= '9')) {
-        continue;
-      }
-      throw new IllegalArgumentException("Name cannot be have a '" + ch + "' char");
-    }
-  }
-
-  /**
-   * Checks if output name is valid.
-   * 
-   * name cannot be the name used for the default output
-   * 
-   * @param outputPath
-   *          base output Name
-   * @throws IllegalArgumentException
-   *           if the output name is not valid.
-   */
-  private static void checkBaseOutputPath(String outputPath) {
-    if (outputPath.equals(FileOutputFormat.PART)) {
-      throw new IllegalArgumentException("output name cannot be 'part'");
-    }
-  }
-
-  /**
-   * Checks if a named output name is valid.
-   * 
-   * @param namedOutput
-   *          named output Name
-   * @throws IllegalArgumentException
-   *           if the output name is not valid.
-   */
-  private static void checkNamedOutputName(JobContext job, String namedOutput, boolean alreadyDefined) {
-    checkTokenName(namedOutput);
-    checkBaseOutputPath(namedOutput);
-    List<String> definedChannels = getNamedOutputsList(job);
-    if (alreadyDefined && definedChannels.contains(namedOutput)) {
-      throw new IllegalArgumentException("Named output '" + namedOutput + "' already alreadyDefined");
-    } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) {
-      throw new IllegalArgumentException("Named output '" + namedOutput + "' not defined");
-    }
-  }
-
-  // Returns list of channel names.
-  private static List<String> getNamedOutputsList(JobContext job) {
-    List<String> names = new ArrayList<String>();
-    StringTokenizer st = new StringTokenizer(job.getConfiguration().get(MULTIPLE_OUTPUTS, ""), " ");
-    while (st.hasMoreTokens()) {
-      names.add(st.nextToken());
-    }
-    return names;
-  }
-
-  // Returns the named output OutputFormat.
-  @SuppressWarnings("unchecked")
-  private static Class<? extends OutputFormat<?, ?>> getNamedOutputFormatClass(JobContext job, String namedOutput) {
-    return (Class<? extends OutputFormat<?, ?>>) job.getConfiguration().getClass(MO_PREFIX + namedOutput + FORMAT,
-        null, OutputFormat.class);
-  }
-
-  // Returns the key class for a named output.
-  private static Class<?> getNamedOutputKeyClass(JobContext job, String namedOutput) {
-    return job.getConfiguration().getClass(MO_PREFIX + namedOutput + KEY, null, Object.class);
-  }
-
-  // Returns the value class for a named output.
-  private static Class<?> getNamedOutputValueClass(JobContext job, String namedOutput) {
-    return job.getConfiguration().getClass(MO_PREFIX + namedOutput + VALUE, null, Object.class);
-  }
-
-  /**
-   * Adds a named output for the job.
-   * <p/>
-   * 
-   * @param job
-   *          job to add the named output
-   * @param namedOutput
-   *          named output name, it has to be a word, letters and numbers only,
-   *          cannot be the word 'part' as that is reserved for the default
-   *          output.
-   * @param outputFormatClass
-   *          OutputFormat class.
-   * @param keyClass
-   *          key class
-   * @param valueClass
-   *          value class
-   */
-  public static void addNamedOutput(Job job, String namedOutput, Class<? extends OutputFormat> outputFormatClass,
-      Class<?> keyClass, Class<?> valueClass) {
-    checkNamedOutputName(job, namedOutput, true);
-    Configuration conf = job.getConfiguration();
-    conf.set(MULTIPLE_OUTPUTS, conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput);
-    conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass, OutputFormat.class);
-    conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class);
-    conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class);
-  }
-
-  /**
-   * Enables or disables counters for the named outputs.
-   * 
-   * The counters group is the {@link CrunchMultipleOutputs} class name. The
-   * names of the counters are the same as the named outputs. These counters
-   * count the number records written to each output name. By default these
-   * counters are disabled.
-   * 
-   * @param job
-   *          job to enable counters
-   * @param enabled
-   *          indicates if the counters will be enabled or not.
-   */
-  public static void setCountersEnabled(Job job, boolean enabled) {
-    job.getConfiguration().setBoolean(COUNTERS_ENABLED, enabled);
-  }
-
-  /**
-   * Returns if the counters for the named outputs are enabled or not. By
-   * default these counters are disabled.
-   * 
-   * @param job
-   *          the job
-   * @return TRUE if the counters are enabled, FALSE if they are disabled.
-   */
-  public static boolean getCountersEnabled(JobContext job) {
-    return job.getConfiguration().getBoolean(COUNTERS_ENABLED, false);
-  }
-
-  /**
-   * Wraps RecordWriter to increment counters.
-   */
-  @SuppressWarnings("unchecked")
-  private static class RecordWriterWithCounter extends RecordWriter {
-    private RecordWriter writer;
-    private String counterName;
-    private TaskInputOutputContext context;
-
-    public RecordWriterWithCounter(RecordWriter writer, String counterName, TaskInputOutputContext context) {
-      this.writer = writer;
-      this.counterName = counterName;
-      this.context = context;
-    }
-
-    @SuppressWarnings({ "unchecked" })
-    public void write(Object key, Object value) throws IOException, InterruptedException {
-      context.getCounter(COUNTERS_GROUP, counterName).increment(1);
-      writer.write(key, value);
-    }
-
-    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
-      writer.close(context);
-    }
-  }
-
-  // instance code, to be used from Mapper/Reducer code
-
-  private TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context;
-  private Set<String> namedOutputs;
-  private Map<String, RecordWriter<?, ?>> recordWriters;
-  private boolean countersEnabled;
-
-  /**
-   * Creates and initializes multiple outputs support, it should be instantiated
-   * in the Mapper/Reducer setup method.
-   * 
-   * @param context
-   *          the TaskInputOutputContext object
-   */
-  public CrunchMultipleOutputs(TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) {
-    this.context = context;
-    namedOutputs = Collections.unmodifiableSet(new HashSet<String>(CrunchMultipleOutputs.getNamedOutputsList(context)));
-    recordWriters = new HashMap<String, RecordWriter<?, ?>>();
-    countersEnabled = getCountersEnabled(context);
-  }
-
-  /**
-   * Write key and value to the namedOutput.
-   * 
-   * Output path is a unique file generated for the namedOutput. For example,
-   * {namedOutput}-(m|r)-{part-number}
-   * 
-   * @param namedOutput
-   *          the named output name
-   * @param key
-   *          the key
-   * @param value
-   *          the value
-   */
-  @SuppressWarnings("unchecked")
-  public <K, V> void write(String namedOutput, K key, V value) throws IOException, InterruptedException {
-    write(namedOutput, key, value, namedOutput);
-  }
-
-  /**
-   * Write key and value to baseOutputPath using the namedOutput.
-   * 
-   * @param namedOutput
-   *          the named output name
-   * @param key
-   *          the key
-   * @param value
-   *          the value
-   * @param baseOutputPath
-   *          base-output path to write the record to. Note: Framework will
-   *          generate unique filename for the baseOutputPath
-   */
-  @SuppressWarnings("unchecked")
-  public <K, V> void write(String namedOutput, K key, V value, String baseOutputPath) throws IOException,
-      InterruptedException {
-    checkNamedOutputName(context, namedOutput, false);
-    checkBaseOutputPath(baseOutputPath);
-    if (!namedOutputs.contains(namedOutput)) {
-      throw new IllegalArgumentException("Undefined named output '" + namedOutput + "'");
-    }
-    TaskAttemptContext taskContext = getContext(namedOutput);
-    getRecordWriter(taskContext, baseOutputPath).write(key, value);
-  }
-
-  /**
-   * Write key value to an output file name.
-   * 
-   * Gets the record writer from job's output format. Job's output format should
-   * be a FileOutputFormat.
-   * 
-   * @param key
-   *          the key
-   * @param value
-   *          the value
-   * @param baseOutputPath
-   *          base-output path to write the record to. Note: Framework will
-   *          generate unique filename for the baseOutputPath
-   */
-  @SuppressWarnings("unchecked")
-  public void write(KEYOUT key, VALUEOUT value, String baseOutputPath) throws IOException, InterruptedException {
-    checkBaseOutputPath(baseOutputPath);
-    TaskAttemptContext taskContext = TaskAttemptContextFactory.create(context.getConfiguration(),
-        context.getTaskAttemptID());
-    getRecordWriter(taskContext, baseOutputPath).write(key, value);
-  }
-
-  // by being synchronized MultipleOutputTask can be use with a
-  // MultithreadedMapper.
-  @SuppressWarnings("unchecked")
-  private synchronized RecordWriter getRecordWriter(TaskAttemptContext taskContext, String baseFileName)
-      throws IOException, InterruptedException {
-
-    // look for record-writer in the cache
-    RecordWriter writer = recordWriters.get(baseFileName);
-
-    // If not in cache, create a new one
-    if (writer == null) {
-      // get the record writer from context output format
-      taskContext.getConfiguration().set(BASE_OUTPUT_NAME, baseFileName);
-      try {
-        writer = ((OutputFormat) ReflectionUtils.newInstance(taskContext.getOutputFormatClass(),
-            taskContext.getConfiguration())).getRecordWriter(taskContext);
-      } catch (ClassNotFoundException e) {
-        throw new IOException(e);
-      }
-
-      // if counters are enabled, wrap the writer with context
-      // to increment counters
-      if (countersEnabled) {
-        writer = new RecordWriterWithCounter(writer, baseFileName, context);
-      }
-
-      // add the record-writer to the cache
-      recordWriters.put(baseFileName, writer);
-    }
-    return writer;
-  }
-
-  // Create a taskAttemptContext for the named output with
-  // output format and output key/value types put in the context
-  private TaskAttemptContext getContext(String nameOutput) throws IOException {
-
-    TaskAttemptContext taskContext = taskContexts.get(nameOutput);
-
-    if (taskContext != null) {
-      return taskContext;
-    }
-
-    // The following trick leverages the instantiation of a record writer via
-    // the job thus supporting arbitrary output formats.
-    Job job = new Job(context.getConfiguration());
-    job.getConfiguration().set("crunch.namedoutput", nameOutput);
-    job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput));
-    job.setOutputKeyClass(getNamedOutputKeyClass(context, nameOutput));
-    job.setOutputValueClass(getNamedOutputValueClass(context, nameOutput));
-    taskContext = TaskAttemptContextFactory.create(job.getConfiguration(), context.getTaskAttemptID());
-
-    taskContexts.put(nameOutput, taskContext);
-
-    return taskContext;
-  }
-
-  /**
-   * Closes all the opened outputs.
-   * 
-   * This should be called from cleanup method of map/reduce task. If overridden
-   * subclasses must invoke <code>super.close()</code> at the end of their
-   * <code>close()</code>
-   * 
-   */
-  @SuppressWarnings("unchecked")
-  public void close() throws IOException, InterruptedException {
-    for (RecordWriter writer : recordWriters.values()) {
-      writer.close(context);
-    }
-  }
-}


Mime
View raw message