hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r816496 [3/4] - in /hadoop/mapreduce/trunk: ./ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/ src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ src...
Date Fri, 18 Sep 2009 07:04:45 GMT
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Cluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Cluster.java?rev=816496&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Cluster.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Cluster.java Fri Sep 18 07:04:42 2009
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.LocalJobRunner;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.server.jobtracker.State;
+import org.apache.hadoop.net.NetUtils;
+
+/**
+ * Provides a way to access information about the map/reduce cluster.
+ */
+public class Cluster {
+  private ClientProtocol client;
+  private Configuration conf;
+  private FileSystem fs = null;
+  private Path sysDir = null;
+
+  static {
+    Configuration.addDefaultResource("mapred-default.xml");
+    Configuration.addDefaultResource("mapred-site.xml");
+  }
+  
+  public Cluster(Configuration conf) throws IOException {
+    this.conf = conf;
+    client = createClient(conf);
+  }
+
+  public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
+      throws IOException {
+    this.conf = conf;
+    client = createRPCProxy(jobTrackAddr, conf);
+  }
+
+  private ClientProtocol createRPCProxy(InetSocketAddress addr,
+      Configuration conf) throws IOException {
+    return (ClientProtocol) RPC.getProxy(ClientProtocol.class,
+      ClientProtocol.versionID, addr, Job.getUGI(conf), conf,
+      NetUtils.getSocketFactory(conf, ClientProtocol.class));
+  }
+
+  private ClientProtocol createClient(Configuration conf) throws IOException {
+    ClientProtocol client;
+    String tracker = conf.get("mapred.job.tracker", "local");
+    if ("local".equals(tracker)) {
+      client = new LocalJobRunner(conf);
+    } else {
+      client = createRPCProxy(JobTracker.getAddress(conf), conf);
+    }
+    return client;
+  }
+  
+  ClientProtocol getClient() {
+    return client;
+  }
+  
+  Configuration getConf() {
+    return conf;
+  }
+  
+  /**
+   * Close the <code>Cluster</code>.
+   */
+  public synchronized void close() throws IOException {
+    if (!(client instanceof LocalJobRunner)) {
+      RPC.stopProxy(client);
+    }
+  }
+
+  private Job[] getJobs(JobStatus[] stats) throws IOException {
+    List<Job> jobs = new ArrayList<Job>();
+    for (JobStatus stat : stats) {
+      jobs.add(new Job(this, stat, new JobConf(stat.getJobFile())));
+    }
+    return jobs.toArray(new Job[0]);
+  }
+
+  /**
+   * Get the file system where job-specific files are stored
+   * 
+   * @return object of FileSystem
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public synchronized FileSystem getFileSystem() 
+      throws IOException, InterruptedException {
+    if (this.fs == null) {
+      Path sysDir = new Path(client.getSystemDir());
+      this.fs = sysDir.getFileSystem(getConf());
+    }
+    return fs;
+  }
+
+  /**
+   * Get job corresponding to jobid.
+   * 
+   * @param jobId
+   * @return object of {@link Job}
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public Job getJob(JobID jobId) throws IOException, InterruptedException {
+    JobStatus status = client.getJobStatus(jobId);
+    if (status != null) {
+      return new Job(this, status, new JobConf(status.getJobFile()));
+    }
+    return null;
+  }
+  
+  /**
+   * Get all the queues in cluster.
+   * 
+   * @return array of {@link QueueInfo}
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public QueueInfo[] getQueues() throws IOException, InterruptedException {
+    return client.getQueues();
+  }
+  
+  /**
+   * Get queue information for the specified name.
+   * 
+   * @param name queuename
+   * @return object of {@link QueueInfo}
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public QueueInfo getQueue(String name) 
+      throws IOException, InterruptedException {
+    return client.getQueue(name);
+  }
+  
+  /**
+   * Get current cluster status.
+   * 
+   * @return object of {@link ClusterMetrics}
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public ClusterMetrics getClusterStatus() throws IOException, InterruptedException {
+    return client.getClusterMetrics();
+  }
+  
+  /**
+   * Get all active trackers in the cluster.
+   * 
+   * @return array of {@link TaskTrackerInfo}
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public TaskTrackerInfo[] getActiveTaskTrackers() 
+      throws IOException, InterruptedException  {
+    return client.getActiveTrackers();
+  }
+  
+  /**
+   * Get blacklisted trackers.
+   * 
+   * @return array of {@link TaskTrackerInfo}
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public TaskTrackerInfo[] getBlackListedTaskTrackers() 
+      throws IOException, InterruptedException  {
+    return client.getBlacklistedTrackers();
+  }
+  
+  /**
+   * Get all the jobs in cluster.
+   * 
+   * @return array of {@link Job}
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public Job[] getAllJobs() throws IOException, InterruptedException {
+    return getJobs(client.getAllJobs());
+  }
+  
+  /**
+   * Grab the jobtracker system directory path where 
+   * job-specific files will  be placed.
+   * 
+   * @return the system directory where job-specific files are to be placed.
+   */
+  public Path getSystemDir() throws IOException, InterruptedException {
+    if (sysDir == null) {
+      sysDir = new Path(client.getSystemDir());
+    }
+    return sysDir;
+  }
+
+  /**
+   * Gets the Queue ACLs for current user
+   * @return array of QueueAclsInfo object for current user.
+   * @throws IOException
+   */
+  public QueueAclsInfo[] getQueueAclsForCurrentUser() 
+      throws IOException, InterruptedException  {
+    return client.getQueueAclsForCurrentUser();
+  }
+
+  /**
+   * Gets the root level queues.
+   * @return array of JobQueueInfo object.
+   * @throws IOException
+   */
+  public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
+    return client.getRootQueues();
+  }
+  
+  /**
+   * Returns immediate children of queueName.
+   * @param queueName
+   * @return array of JobQueueInfo which are children of queueName
+   * @throws IOException
+   */
+  public QueueInfo[] getChildQueues(String queueName) 
+      throws IOException, InterruptedException {
+    return client.getChildQueues(queueName);
+  }
+
+  /**
+   * Get JobTracker's state
+   * 
+   * @return {@link State} of the JobTracker
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public State getJobTrackerState() throws IOException, InterruptedException {
+    return client.getJobTrackerState();
+  }
+  
+  /**
+   * Get the tasktracker expiry interval for the cluster
+   * @return the expiry interval in msec
+   */
+  public long getTaskTrackerExpiryInterval() throws IOException,
+      InterruptedException {
+    return client.getTaskTrackerExpiryInterval();
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java?rev=816496&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java Fri Sep 18 07:04:42 2009
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Status information on the current state of the Map-Reduce cluster.
+ * 
+ * <p><code>ClusterMetrics</code> provides clients with information such as:
+ * <ol>
+ *   <li>
+ *   Size of the cluster.  
+ *   </li>
+ *   <li>
+ *   Number of blacklisted and decommissioned trackers.  
+ *   </li>
+ *   <li>
+ *   Task capacity of the cluster. 
+ *   </li>
+ *   <li>
+ *   The number of currently running map & reduce tasks.
+ *   </li>
+ * </ol></p>
+ * 
+ * <p>Clients can query for the latest <code>ClusterMetrics</code>, via 
+ * {@link Cluster#getClusterStatus()}.</p>
+ * 
+ * @see Cluster
+ */
+public class ClusterMetrics implements Writable {
+  int runningMaps;
+  int runningReduces;
+  int mapSlots;
+  int reduceSlots;
+  int numTrackers;
+  int numBlacklistedTrackers;
+  int numDecommissionedTrackers;
+
+  public ClusterMetrics() {
+  }
+  
+  public ClusterMetrics(int runningMaps, int runningReduces, int mapSlots, 
+    int reduceSlots, int numTrackers, int numBlacklistedTrackers,
+    int numDecommisionedNodes) {
+    this.runningMaps = runningMaps;
+    this.runningReduces = runningReduces;
+    this.mapSlots = mapSlots;
+    this.reduceSlots = reduceSlots;
+    this.numTrackers = numTrackers;
+    this.numBlacklistedTrackers = numBlacklistedTrackers;
+    this.numDecommissionedTrackers = numDecommisionedNodes;
+  }
+  
+  public int getOccupiedMapSlots() { 
+    return runningMaps;
+  }
+  
+  public int getOccupiedReduceSlots() { 
+    return runningReduces; 
+  }
+  
+  public int getMapSlotCapacity() {
+    return mapSlots;
+  }
+  
+  public int getReduceSlotCapacity() {
+    return reduceSlots;
+  }
+  
+  public int getTaskTrackerCount() {
+    return numTrackers;
+  }
+  
+  public int getBlackListedTaskTrackerCount() {
+    return numBlacklistedTrackers;
+  }
+  
+  public int getDecommissionedTaskTrackerCount() {
+    return numDecommissionedTrackers;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    runningMaps = in.readInt();
+    runningReduces = in.readInt();
+    mapSlots = in.readInt();
+    reduceSlots = in.readInt();
+    numTrackers = in.readInt();
+    numBlacklistedTrackers = in.readInt();
+    numDecommissionedTrackers = in.readInt();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(runningMaps);
+    out.writeInt(runningReduces);
+    out.writeInt(mapSlots);
+    out.writeInt(reduceSlots);
+    out.writeInt(numTrackers);
+    out.writeInt(numBlacklistedTrackers);
+    out.writeInt(numDecommissionedTrackers);
+  }
+
+}

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java?rev=816496&r1=816495&r2=816496&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java Fri Sep 18 07:04:42 2009
@@ -18,45 +18,146 @@
 
 package org.apache.hadoop.mapreduce;
 
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.net.URL;
+import java.net.URLConnection;
+import java.util.Arrays;
 import java.net.URI;
 
+import javax.security.auth.login.LoginException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.TaskCompletionEvent;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
 
 /**
- * The job submitter's view of the Job. It allows the user to configure the
+ * The job submitter's view of the Job.
+ * 
+ * <p>It allows the user to configure the
  * job, submit it, control its execution, and query the state. The set methods
  * only work until the job is submitted, afterwards they will throw an 
- * IllegalStateException.
+ * IllegalStateException. </p>
+ * 
+ * <p>
+ * Normally the user creates the application, describes various facets of the
+ * job via {@link Job} and then submits the job and monitor its progress.</p>
+ * 
+ * <p>Here is an example on how to submit a job:</p>
+ * <p><blockquote><pre>
+ *     // Create a new Job
+ *     Job job = new Job(new Configuration());
+ *     job.setJarByClass(MyJob.class);
+ *     
+ *     // Specify various job-specific parameters     
+ *     job.setJobName("myjob");
+ *     
+ *     job.setInputPath(new Path("in"));
+ *     job.setOutputPath(new Path("out"));
+ *     
+ *     job.setMapperClass(MyJob.MyMapper.class);
+ *     job.setReducerClass(MyJob.MyReducer.class);
+ *
+ *     // Submit the job, then poll for progress until the job is complete
+ *     job.waitForCompletion(true);
+ * </pre></blockquote></p>
+ * 
+ * 
  */
 public class Job extends JobContext {  
+  private static final Log LOG = LogFactory.getLog(Job.class);
   public static enum JobState {DEFINE, RUNNING};
-  private JobState state = JobState.DEFINE;
-  private JobClient jobClient;
-  private RunningJob info;
+  private static final long MAX_JOBSTATUS_AGE = 1000 * 2;
+  public static final String OUTPUT_FILTER = "jobclient.output.filter";
+  /** Key in mapred-*.xml that sets completionPollInvervalMillis */
+  public static final String COMPLETION_POLL_INTERVAL_KEY = 
+    "jobclient.completion.poll.interval";
+  
+  /** Default completionPollIntervalMillis is 5000 ms. */
+  static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000;
+  /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */
+  public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY =
+      "jobclient.progress.monitor.poll.interval";
+  /** Default progMonitorPollIntervalMillis is 1000 ms. */
+  static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;
+
+  public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
+
+  static {
+    Configuration.addDefaultResource("mapred-default.xml");
+    Configuration.addDefaultResource("mapred-site.xml");
+  }
 
+  private JobState state = JobState.DEFINE;
+  private JobStatus status;
+  private long statustime;
+  private Cluster cluster;
+  
+  @Deprecated
   public Job() throws IOException {
     this(new Configuration());
   }
 
+  @Deprecated
   public Job(Configuration conf) throws IOException {
-    super(conf, null);
-    jobClient = new JobClient((JobConf) getConfiguration());
+    this(new Cluster(conf), conf);
   }
 
+  @Deprecated
   public Job(Configuration conf, String jobName) throws IOException {
     this(conf);
     setJobName(jobName);
   }
 
+  Job(Cluster cluster) throws IOException {
+    this(cluster, new Configuration());
+  }
+
+  Job(Cluster cluster, Configuration conf) throws IOException {
+    super(conf, null);
+    this.cluster = cluster;
+  }
+
+  Job(Cluster cluster, JobStatus status,
+             Configuration conf) throws IOException {
+    this(cluster, conf);
+    state = JobState.RUNNING;
+    this.status = status;
+  }
+  
+  public static Job getInstance(Cluster cluster) throws IOException {
+     return new Job(cluster);
+  }
+  
+  public static Job getInstance(Cluster cluster, Configuration conf) 
+      throws IOException {
+    return new Job(cluster, conf);
+  }
+  
+  public static Job getInstance(Cluster cluster, JobStatus status, 
+      Configuration conf) throws IOException {
+    return new Job(cluster, status, conf);
+  }
+  
   private void ensureState(JobState state) throws IllegalStateException {
     if (state != this.state) {
       throw new IllegalStateException("Job in state "+ this.state + 
@@ -65,6 +166,351 @@
   }
 
   /**
+   * Some methods rely on having a recent job status object.  Refresh
+   * it, if necessary
+   */
+  synchronized void ensureFreshStatus() 
+      throws IOException, InterruptedException {
+    if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) {
+      updateStatus();
+    }
+  }
+    
+  /** Some methods need to update status immediately. So, refresh
+   * immediately
+   * @throws IOException
+   */
+  synchronized void updateStatus() throws IOException, InterruptedException {
+    this.status = cluster.getClient().getJobStatus(status.getJobID());
+    if (this.status == null) {
+      throw new IOException("Job status not available ");
+    }
+    this.statustime = System.currentTimeMillis();
+  }
+  
+  public JobStatus getStatus() throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    updateStatus();
+    return status;
+  }
+  /**
+   * Get the job identifier.
+   * 
+   * @return the job identifier.
+   */
+  public JobID getID() {
+    ensureState(JobState.RUNNING);
+    return status.getJobID();
+  }
+
+  /**
+   * Returns the current state of the Job.
+   * 
+   * @return JobStatus#State
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public JobStatus.State getJobState() 
+      throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    updateStatus();
+    return status.getState();
+  }
+  
+  /**
+   * Get the URL where some job progress information will be displayed.
+   * 
+   * @return the URL where some job progress information will be displayed.
+   */
+  public String getTrackingURL(){
+    ensureState(JobState.RUNNING);
+    return status.getTrackingUrl().toString();
+  }
+
+  /**
+   * Get the path of the submitted job configuration.
+   * 
+   * @return the path of the submitted job configuration.
+   */
+  public String getJobFile() {
+    ensureState(JobState.RUNNING);
+    return status.getJobFile();
+  }
+
+  /**
+   * Get start time of the job.
+   * 
+   * @return the start time of the job
+   */
+  public long getStartTime() {
+    ensureState(JobState.RUNNING);
+    return status.getStartTime();
+  }
+
+  /**
+   * Get finish time of the job.
+   * 
+   * @return the finish time of the job
+   */
+  public long getFinishTime() throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    updateStatus();
+    return status.getFinishTime();
+  }
+
+  /**
+   * Get scheduling info of the job.
+   * 
+   * @return the scheduling info of the job
+   */
+  public String getSchedulingInfo() {
+    ensureState(JobState.RUNNING);
+    return status.getSchedulingInfo();
+  }
+
+  /**
+   * Get scheduling info of the job.
+   * 
+   * @return the scheduling info of the job
+   */
+  public JobPriority getPriority() throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    updateStatus();
+    return status.getPriority();
+  }
+
+  /**
+   * The user-specified job name.
+   */
+  public String getJobName() {
+    if (state == JobState.DEFINE) {
+      return super.getJobName();
+    }
+    ensureState(JobState.RUNNING);
+    return status.getJobName();
+  }
+
+  public String getHistoryUrl() throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    updateStatus();
+    return status.getHistoryFile();
+  }
+
+  public boolean isRetired() throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    updateStatus();
+    return status.isRetired();
+  }
+
+  /**
+   * Dump stats to screen.
+   */
+  @Override
+  public String toString() {
+    ensureState(JobState.RUNNING);
+    try {
+      updateStatus();
+    } catch (IOException e) {
+    } catch (InterruptedException ie) {
+    }
+    StringBuffer sb = new StringBuffer();
+    sb.append("Job: ").append(status.getJobID()).append("\n");
+    sb.append("Job File: ").append(status.getJobFile()).append("\n");
+    sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
+    sb.append("\n");
+    sb.append("map() completion: ");
+    sb.append(status.getMapProgress()).append("\n");
+    sb.append("reduce() completion: ");
+    sb.append(status.getReduceProgress()).append("\n");
+    sb.append("Job state: ");
+    sb.append(status.getState()).append("\n");
+    sb.append("history URL: ");
+    sb.append(status.getHistoryFile()).append("\n");
+    sb.append("retired: ").append(status.isRetired());
+    return sb.toString();
+  }
+      
+  /**
+   * Get the information of the current state of the tasks of a job.
+   * 
+   * @param type Type of the task
+   * @return the list of all of the map tips.
+   * @throws IOException
+   */
+  public TaskReport[] getTaskReports(TaskType type) 
+      throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    return cluster.getClient().getTaskReports(getID(), type);
+  }
+
+  /**
+   * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 
+   * and 1.0.  When all map tasks have completed, the function returns 1.0.
+   * 
+   * @return the progress of the job's map-tasks.
+   * @throws IOException
+   */
+  public float mapProgress() throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    ensureFreshStatus();
+    return status.getMapProgress();
+  }
+
+  /**
+   * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 
+   * and 1.0.  When all reduce tasks have completed, the function returns 1.0.
+   * 
+   * @return the progress of the job's reduce-tasks.
+   * @throws IOException
+   */
+  public float reduceProgress() throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    ensureFreshStatus();
+    return status.getReduceProgress();
+  }
+
+  /**
+   * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0 
+   * and 1.0.  When all cleanup tasks have completed, the function returns 1.0.
+   * 
+   * @return the progress of the job's cleanup-tasks.
+   * @throws IOException
+   */
+  public float cleanupProgress() throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    ensureFreshStatus();
+    return status.getCleanupProgress();
+  }
+
+  /**
+   * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0 
+   * and 1.0.  When all setup tasks have completed, the function returns 1.0.
+   * 
+   * @return the progress of the job's setup-tasks.
+   * @throws IOException
+   */
+  public float setupProgress() throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    ensureFreshStatus();
+    return status.getSetupProgress();
+  }
+
+  /**
+   * Check if the job is finished or not. 
+   * This is a non-blocking call.
+   * 
+   * @return <code>true</code> if the job is complete, else <code>false</code>.
+   * @throws IOException
+   */
+  public boolean isComplete() throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    updateStatus();
+    return status.isJobComplete();
+  }
+
+  /**
+   * Check if the job completed successfully. 
+   * 
+   * @return <code>true</code> if the job succeeded, else <code>false</code>.
+   * @throws IOException
+   */
+  public boolean isSuccessful() throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    updateStatus();
+    return status.getState() == JobStatus.State.SUCCEEDED;
+  }
+
+  /**
+   * Kill the running job.  Blocks until all job tasks have been
+   * killed as well.  If the job is no longer running, it simply returns.
+   * 
+   * @throws IOException
+   */
+  public void killJob() throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    cluster.getClient().killJob(getID());
+  }
+
+  /**
+   * Set the priority of a running job.
+   * @param priority the new priority for the job.
+   * @throws IOException
+   */
+  public void setPriority(JobPriority priority) 
+      throws IOException, InterruptedException {
+    if (state == JobState.DEFINE) {
+      conf.setJobPriority(
+        org.apache.hadoop.mapred.JobPriority.valueOf(priority.name()));
+    } else {
+      ensureState(JobState.RUNNING);
+      cluster.getClient().setJobPriority(getID(), priority.toString());
+    }
+  }
+
+  /**
+   * Get events indicating completion (success/failure) of component tasks.
+   *  
+   * @param startFrom index to start fetching events from
+   * @param numEvents number of events to fetch
+   * @return an array of {@link TaskCompletionEvent}s
+   * @throws IOException
+   */
+  public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom,
+      int numEvents) throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    return cluster.getClient().getTaskCompletionEvents(getID(),
+      startFrom, numEvents); 
+  }
+  
+  /**
+   * Kill indicated task attempt.
+   * 
+   * @param taskId the id of the task to be terminated.
+   * @throws IOException
+   */
+  public boolean killTask(TaskAttemptID taskId) 
+      throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    return cluster.getClient().killTask(taskId, false);
+  }
+
+  /**
+   * Fail indicated task attempt.
+   * 
+   * @param taskId the id of the task to be terminated.
+   * @throws IOException
+   */
+  public boolean failTask(TaskAttemptID taskId) 
+      throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    return cluster.getClient().killTask(taskId, true);
+  }
+
+  /**
+   * Gets the counters for this job.
+   * 
+   * @return the counters for this job.
+   * @throws IOException
+   */
+  public Counters getCounters() 
+      throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    return cluster.getClient().getJobCounters(getID());
+  }
+
+  /**
+   * Gets the diagnostic messages for a given task attempt.
+   * @param taskid
+   * @return the list of diagnostic messages for the task
+   * @throws IOException
+   */
+  public String[] getTaskDiagnostics(TaskAttemptID taskid) 
+      throws IOException, InterruptedException {
+    ensureState(JobState.RUNNING);
+    return cluster.getClient().getTaskDiagnostics(taskid);
+  }
+
+  /**
    * Set the number of reduce tasks for the job.
    * @param tasks the number of reduce tasks
    * @throws IllegalStateException if the job is submitted
@@ -123,15 +569,26 @@
    * @param cls the example class
    */
   public void setJarByClass(Class<?> cls) {
+    ensureState(JobState.DEFINE);
     conf.setJarByClass(cls);
   }
-  
+
+  /**
+   * Set the job jar 
+   */
+  public void setJar(String jar) {
+    ensureState(JobState.DEFINE);
+    conf.setJar(jar);
+  }
+
   /**
-   * Get the pathname of the job's jar.
-   * @return the pathname
+   * Set the reported username for this job.
+   * 
+   * @param user the username for this job.
    */
-  public String getJar() {
-    return conf.getJar();
+  public void setUser(String user) {
+    ensureState(JobState.DEFINE);
+    conf.setUser(user);
   }
 
   /**
@@ -395,129 +852,38 @@
   }
 
   /**
-   * Get the URL where some job progress information will be displayed.
-   * 
-   * @return the URL where some job progress information will be displayed.
-   */
-  public String getTrackingURL() {
-    ensureState(JobState.RUNNING);
-    return info.getTrackingURL();
-  }
-
-  /**
-   * Get the job identifier.
-   * 
-   * @return the job identifier.
-   */
-  public JobID getID() {
-    ensureState(JobState.RUNNING);
-    return info.getID();
-  }
-  
-  /**
-   * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 
-   * and 1.0.  When all map tasks have completed, the function returns 1.0.
-   * 
-   * @return the progress of the job's map-tasks.
-   * @throws IOException
-   */
-  public float mapProgress() throws IOException {
-    ensureState(JobState.RUNNING);
-    return info.mapProgress();
-  }
-
-  /**
-   * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 
-   * and 1.0.  When all reduce tasks have completed, the function returns 1.0.
-   * 
-   * @return the progress of the job's reduce-tasks.
-   * @throws IOException
-   */
-  public float reduceProgress() throws IOException {
-    ensureState(JobState.RUNNING);
-    return info.reduceProgress();
-  }
-
-  /**
-   * Check if the job is finished or not. 
-   * This is a non-blocking call.
-   * 
-   * @return <code>true</code> if the job is complete, else <code>false</code>.
-   * @throws IOException
-   */
-  public boolean isComplete() throws IOException {
-    ensureState(JobState.RUNNING);
-    return info.isComplete();
-  }
-
-  /**
-   * Check if the job completed successfully. 
-   * 
-   * @return <code>true</code> if the job succeeded, else <code>false</code>.
-   * @throws IOException
-   */
-  public boolean isSuccessful() throws IOException {
-    ensureState(JobState.RUNNING);
-    return info.isSuccessful();
-  }
-
-  /**
-   * Kill the running job.  Blocks until all job tasks have been
-   * killed as well.  If the job is no longer running, it simply returns.
-   * 
-   * @throws IOException
-   */
-  public void killJob() throws IOException {
-    ensureState(JobState.RUNNING);
-    info.killJob();
-  }
-    
-  /**
-   * Get events indicating completion (success/failure) of component tasks.
-   *  
-   * @param startFrom index to start fetching events from
-   * @return an array of {@link TaskCompletionEvent}s
-   * @throws IOException
-   */
-  public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom
-                                                       ) throws IOException {
-    ensureState(JobState.RUNNING);
-    return info.getTaskCompletionEvents(startFrom);
-  }
-  
-  /**
-   * Kill indicated task attempt.
-   * 
-   * @param taskId the id of the task to be terminated.
-   * @throws IOException
+   * Set whether the system should collect profiler information for some of 
+   * the tasks in this job? The information is stored in the user log 
+   * directory.
+   * @param newValue true means it should be gathered
    */
-  public void killTask(TaskAttemptID taskId) throws IOException {
-    ensureState(JobState.RUNNING);
-    info.killTask(org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId), 
-                  false);
+  public void setProfileEnabled(boolean newValue) {
+    ensureState(JobState.DEFINE);
+    conf.setProfileEnabled(newValue);
   }
 
   /**
-   * Fail indicated task attempt.
-   * 
-   * @param taskId the id of the task to be terminated.
-   * @throws IOException
+   * Set the profiler configuration arguments. If the string contains a '%s' it
+   * will be replaced with the name of the profiling output file when the task
+   * runs.
+   *
+   * This value is passed to the task child JVM on the command line.
+   *
+   * @param value the configuration string
    */
-  public void failTask(TaskAttemptID taskId) throws IOException {
-    ensureState(JobState.RUNNING);
-    info.killTask(org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId), 
-                  true);
+  public void setProfileParams(String value) {
+    ensureState(JobState.DEFINE);
+    conf.setProfileParams(value);
   }
 
   /**
-   * Gets the counters for this job.
-   * 
-   * @return the counters for this job.
-   * @throws IOException
+   * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 
+   * must also be called.
+   * @param newValue a set of integer ranges of the map ids
    */
-  public Counters getCounters() throws IOException {
-    ensureState(JobState.RUNNING);
-    return new Counters(info.getCounters());
+  public void setProfileTaskRange(boolean isMap, String newValue) {
+    ensureState(JobState.DEFINE);
+    conf.setProfileTaskRange(isMap, newValue);
   }
 
   private void ensureNotSet(String attr, String msg) throws IOException {
@@ -579,7 +945,8 @@
                               ClassNotFoundException {
     ensureState(JobState.DEFINE);
     setUseNewAPI();
-    info = jobClient.submitJobInternal(conf);
+    status = new JobSubmitter(cluster.getFileSystem(),
+      cluster.getClient()).submitJobInternal(this);
     state = JobState.RUNNING;
    }
   
@@ -597,11 +964,343 @@
       submit();
     }
     if (verbose) {
-      jobClient.monitorAndPrintJob(conf, info);
+      monitorAndPrintJob();
     } else {
-      info.waitForCompletion();
+      // get the completion poll interval from the client.
+      int completionPollIntervalMillis = 
+        Job.getCompletionPollInterval(cluster.getConf());
+      while (!isComplete()) {
+        try {
+          Thread.sleep(completionPollIntervalMillis);
+        } catch (InterruptedException ie) {
+        }
+      }
+    }
+    return isSuccessful();
+  }
+  
+  /**
+   * Monitor a job and print status in real-time as progress is made and tasks 
+   * fail.
+   * @return true if the job succeeded
+   * @throws IOException if communication to the JobTracker fails
+   */
+  public boolean monitorAndPrintJob() 
+      throws IOException, InterruptedException {
+    String lastReport = null;
+    Job.TaskStatusFilter filter;
+    Configuration clientConf = cluster.getConf();
+    filter = Job.getTaskOutputFilter(clientConf);
+    JobID jobId = getID();
+    LOG.info("Running job: " + jobId);
+    int eventCounter = 0;
+    boolean profiling = getProfileEnabled();
+    IntegerRanges mapRanges = getProfileTaskRange(true);
+    IntegerRanges reduceRanges = getProfileTaskRange(false);
+    int progMonitorPollIntervalMillis = 
+      Job.getProgressPollInterval(clientConf);
+    while (!isComplete()) {
+      Thread.sleep(progMonitorPollIntervalMillis);
+      String report = 
+        (" map " + StringUtils.formatPercent(mapProgress(), 0)+
+            " reduce " + 
+            StringUtils.formatPercent(reduceProgress(), 0));
+      if (!report.equals(lastReport)) {
+        LOG.info(report);
+        lastReport = report;
+      }
+
+      TaskCompletionEvent[] events = 
+        getTaskCompletionEvents(eventCounter, 10); 
+      eventCounter += events.length;
+      printTaskEvents(events, filter, profiling, mapRanges, reduceRanges);
+    }
+    LOG.info("Job complete: " + jobId);
+    Counters counters = getCounters();
+    if (counters != null) {
+      LOG.info(counters.toString());
     }
     return isSuccessful();
   }
+
+  private void printTaskEvents(TaskCompletionEvent[] events,
+      Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges,
+      IntegerRanges reduceRanges) throws IOException, InterruptedException {
+    for (TaskCompletionEvent event : events) {
+      TaskCompletionEvent.Status status = event.getStatus();
+      if (profiling && 
+         (status == TaskCompletionEvent.Status.SUCCEEDED ||
+            status == TaskCompletionEvent.Status.FAILED) &&
+            (event.isMapTask() ? mapRanges : reduceRanges).
+              isIncluded(event.idWithinJob())) {
+        downloadProfile(event);
+      }
+      switch (filter) {
+      case NONE:
+        break;
+      case SUCCEEDED:
+        if (event.getStatus() == 
+          TaskCompletionEvent.Status.SUCCEEDED) {
+          LOG.info(event.toString());
+          displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
+        }
+        break; 
+      case FAILED:
+        if (event.getStatus() == 
+          TaskCompletionEvent.Status.FAILED) {
+          LOG.info(event.toString());
+          // Displaying the task diagnostic information
+          TaskAttemptID taskId = event.getTaskAttemptId();
+          String[] taskDiagnostics = getTaskDiagnostics(taskId); 
+          if (taskDiagnostics != null) {
+            for (String diagnostics : taskDiagnostics) {
+              System.err.println(diagnostics);
+            }
+          }
+          // Displaying the task logs
+          displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
+        }
+        break; 
+      case KILLED:
+        if (event.getStatus() == TaskCompletionEvent.Status.KILLED){
+          LOG.info(event.toString());
+        }
+        break; 
+      case ALL:
+        LOG.info(event.toString());
+        displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
+        break;
+      }
+    }
+  }
+  
+  private void downloadProfile(TaskCompletionEvent e) throws IOException  {
+    URLConnection connection = new URL(
+      getTaskLogURL(e.getTaskAttemptId(), e.getTaskTrackerHttp()) + 
+      "&filter=profile").openConnection();
+    InputStream in = connection.getInputStream();
+    OutputStream out = new FileOutputStream(e.getTaskAttemptId() + ".profile");
+    IOUtils.copyBytes(in, out, 64 * 1024, true);
+  }
+  
+  private void displayTaskLogs(TaskAttemptID taskId, String baseUrl)
+      throws IOException {
+    // The tasktracker for a 'failed/killed' job might not be around...
+    if (baseUrl != null) {
+      // Construct the url for the tasklogs
+      String taskLogUrl = getTaskLogURL(taskId, baseUrl);
+      
+      // Copy tasks's stdout of the JobClient
+      getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stdout"), System.out);
+        
+      // Copy task's stderr to stderr of the JobClient 
+      getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stderr"), System.err);
+    }
+  }
+    
+  private void getTaskLogs(TaskAttemptID taskId, URL taskLogUrl, 
+                           OutputStream out) {
+    try {
+      URLConnection connection = taskLogUrl.openConnection();
+      BufferedReader input = 
+        new BufferedReader(new InputStreamReader(connection.getInputStream()));
+      BufferedWriter output = 
+        new BufferedWriter(new OutputStreamWriter(out));
+      try {
+        String logData = null;
+        while ((logData = input.readLine()) != null) {
+          if (logData.length() > 0) {
+            output.write(taskId + ": " + logData + "\n");
+            output.flush();
+          }
+        }
+      } finally {
+        input.close();
+      }
+    } catch(IOException ioe) {
+      LOG.warn("Error reading task output" + ioe.getMessage()); 
+    }
+  }
   
+  private String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
+    return (baseUrl + "/tasklog?plaintext=true&taskid=" + taskId); 
+  }
+
+  /**
+   * Set the UGI, user name and the group name for the job.
+   * 
+   * This method is called by job submission code while submitting the job.
+   * Internal to MapReduce project. 
+   * @throws IOException
+   */
+  public void setUGIAndUserGroupNames()
+      throws IOException {
+    UnixUserGroupInformation ugi = Job.getUGI(conf);
+    setUser(ugi.getUserName());
+    if (ugi.getGroupNames().length > 0) {
+      conf.set("group.name", ugi.getGroupNames()[0]);
+    }
+  }
+
+  /** The interval at which monitorAndPrintJob() prints status */
+  public static int getProgressPollInterval(Configuration conf) {
+    // Read progress monitor poll interval from config. Default is 1 second.
+    int progMonitorPollIntervalMillis = conf.getInt(
+      PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL);
+    if (progMonitorPollIntervalMillis < 1) {
+      LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY + 
+        " has been set to an invalid value; "
+        + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL);
+      progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL;
+    }
+    return progMonitorPollIntervalMillis;
+  }
+
+  /** The interval at which waitForCompletion() should check. */
+  public static int getCompletionPollInterval(Configuration conf) {
+    int completionPollIntervalMillis = conf.getInt(
+      COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL);
+    if (completionPollIntervalMillis < 1) { 
+      LOG.warn(COMPLETION_POLL_INTERVAL_KEY + 
+       " has been set to an invalid value; "
+       + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL);
+      completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL;
+    }
+    return completionPollIntervalMillis;
+  }
+
+  /**
+   * Get the task output filter.
+   * 
+   * @param conf the configuration.
+   * @return the filter level.
+   */
+  public static TaskStatusFilter getTaskOutputFilter(Configuration conf) {
+    return TaskStatusFilter.valueOf(conf.get(Job.OUTPUT_FILTER, "FAILED"));
+  }
+
+  /**
+   * Modify the Configuration to set the task output filter.
+   * 
+   * @param conf the Configuration to modify.
+   * @param newValue the value to set.
+   */
+  public static void setTaskOutputFilter(Configuration conf, 
+      TaskStatusFilter newValue) {
+    conf.set(Job.OUTPUT_FILTER, newValue.toString());
+  }
+
+  public static UnixUserGroupInformation getUGI(Configuration job) 
+      throws IOException {
+    UnixUserGroupInformation ugi = null;
+    try {
+      ugi = UnixUserGroupInformation.login(job, true);
+    } catch (LoginException e) {
+      throw (IOException)(new IOException(
+        "Failed to get the current user's information.").initCause(e));
+    }
+    return ugi;
+  }
+
+  /**
+   * Read a splits file into a list of raw splits.
+   * 
+   * @param in the stream to read from
+   * @return the complete list of splits
+   * @throws IOException
+   */
+  public static RawSplit[] readSplitFile(DataInput in) throws IOException {
+    byte[] header = new byte[JobSubmitter.SPLIT_FILE_HEADER.length];
+    in.readFully(header);
+    if (!Arrays.equals(JobSubmitter.SPLIT_FILE_HEADER, header)) {
+      throw new IOException("Invalid header on split file");
+    }
+    int vers = WritableUtils.readVInt(in);
+    if (vers != JobSubmitter.CURRENT_SPLIT_FILE_VERSION) {
+      throw new IOException("Unsupported split version " + vers);
+    }
+    int len = WritableUtils.readVInt(in);
+    RawSplit[] result = new RawSplit[len];
+    for (int i=0; i < len; ++i) {
+      result[i] = new RawSplit();
+      result[i].readFields(in);
+    }
+    return result;
+  }
+
+  public static class RawSplit implements Writable {
+    private String splitClass;
+    private BytesWritable bytes = new BytesWritable();
+    private String[] locations;
+    long dataLength;
+
+    public RawSplit() {
+    }
+    
+    protected RawSplit(String splitClass, BytesWritable bytes,
+        String[] locations, long dataLength) {
+      this.splitClass = splitClass;
+      this.bytes = bytes;
+      this.locations = locations;
+      this.dataLength = dataLength;
+    }
+
+    public void setBytes(byte[] data, int offset, int length) {
+      bytes.set(data, offset, length);
+    }
+
+    public void setClassName(String className) {
+      splitClass = className;
+    }
+      
+    public String getClassName() {
+      return splitClass;
+    }
+      
+    public BytesWritable getBytes() {
+      return bytes;
+    }
+
+    public void clearBytes() {
+      bytes = null;
+    }
+      
+    public void setLocations(String[] locations) {
+      this.locations = locations;
+    }
+      
+    public String[] getLocations() {
+      return locations;
+    }
+
+    public long getDataLength() {
+      return dataLength;
+    }
+
+    public void setDataLength(long l) {
+      dataLength = l;
+    }
+      
+    public void readFields(DataInput in) throws IOException {
+      splitClass = Text.readString(in);
+      dataLength = in.readLong();
+      bytes.readFields(in);
+      int len = WritableUtils.readVInt(in);
+      locations = new String[len];
+      for (int i=0; i < len; ++i) {
+        locations[i] = Text.readString(in);
+      }
+    }
+      
+    public void write(DataOutput out) throws IOException {
+      Text.writeString(out, splitClass);
+      out.writeLong(dataLength);
+      bytes.write(out);
+      WritableUtils.writeVInt(out, locations.length);
+      for (int i = 0; i < locations.length; i++) {
+        Text.writeString(out, locations[i]);
+      }        
+    }
+  }
+
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=816496&r1=816495&r2=816496&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java Fri Sep 18 07:04:42 2009
@@ -22,6 +22,7 @@
 import java.net.URI;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -246,6 +247,44 @@
   }
 
   /**
+   * Get whether the task profiling is enabled.
+   * @return true if some tasks will be profiled
+   */
+  public boolean getProfileEnabled() {
+    return conf.getProfileEnabled();
+  }
+
+  /**
+   * Get the profiler configuration arguments.
+   *
+   * The default value for this property is
+   * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s"
+   * 
+   * @return the parameters to pass to the task child to configure profiling
+   */
+  public String getProfileParams() {
+    return conf.getProfileParams();
+  }
+
+  /**
+   * Get the range of maps or reduces to profile.
+   * @param isMap is the task a map?
+   * @return the task ranges
+   */
+  public IntegerRanges getProfileTaskRange(boolean isMap) {
+    return conf.getProfileTaskRange(isMap);
+  }
+
+  /**
+   * Get the reported username for this job.
+   * 
+   * @return the username
+   */
+  public String getUser() {
+    return conf.getUser();
+  }
+  
+  /**
    * This method checks to see if symlinks are to be create for the 
    * localized cache files in the current working directory 
    * @return true if symlinks are to be created- else return false

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobPriority.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobPriority.java?rev=816496&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobPriority.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobPriority.java Fri Sep 18 07:04:42 2009
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+/**
+ * Used to describe the priority of the running job. 
+ *
+ */
+public enum JobPriority {
+  VERY_HIGH,
+  HIGH,
+  NORMAL,
+  LOW,
+  VERY_LOW;
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobStatus.java?rev=816496&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobStatus.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobStatus.java Fri Sep 18 07:04:42 2009
@@ -0,0 +1,398 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.io.WritableUtils;
+
+/**************************************************
+ * Describes the current status of a job.
+ **************************************************/
+public class JobStatus implements Writable, Cloneable {
+
+  static {                                      // register a ctor
+    WritableFactories.setFactory
+      (JobStatus.class,
+       new WritableFactory() {
+         public Writable newInstance() { return new JobStatus(); }
+       });
+  }
+
+  /**
+   * Current state of the job 
+   */
+  public static enum State {
+    RUNNING(1),
+    SUCCEEDED(2),
+    FAILED(3),
+    PREP(4),
+    KILLED(5);
+    
+    int value;
+    
+    State(int value) {
+      this.value = value;
+    }
+    
+    public int getValue() {
+      return value; 
+    }
+    
+  };
+  
+  private JobID jobid;
+  private float mapProgress;
+  private float reduceProgress;
+  private float cleanupProgress;
+  private float setupProgress;
+  private State runState;
+  private long startTime;
+  private String user;
+  private JobPriority priority;
+  private String schedulingInfo="NA";
+
+  private String jobName;
+  private String jobFile;
+  private long finishTime;
+  private boolean isRetired;
+  private String historyFile = "";
+  private String trackingUrl ="";
+
+    
+  /**
+   */
+  public JobStatus() {
+  }
+
+  /**
+   * Create a job status object for a given jobid.
+   * @param jobid The jobid of the job
+   * @param setupProgress The progress made on the setup
+   * @param mapProgress The progress made on the maps
+   * @param reduceProgress The progress made on the reduces
+   * @param cleanupProgress The progress made on the cleanup
+   * @param runState The current state of the job
+   * @param jp Priority of the job.
+   * @param user userid of the person who submitted the job.
+   * @param jobName user-specified job name.
+   * @param jobFile job configuration file. 
+   * @param trackingUrl link to the web-ui for details of the job.
+   */
+   public JobStatus(JobID jobid, float setupProgress, float mapProgress,
+                    float reduceProgress, float cleanupProgress, 
+                    State runState, JobPriority jp, String user, String jobName, 
+                    String jobFile, String trackingUrl) {
+     this.jobid = jobid;
+     this.setupProgress = setupProgress;
+     this.mapProgress = mapProgress;
+     this.reduceProgress = reduceProgress;
+     this.cleanupProgress = cleanupProgress;
+     this.runState = runState;
+     this.user = user;
+     if (jp == null) {
+       throw new IllegalArgumentException("Job Priority cannot be null.");
+     }
+     priority = jp;
+     this.jobName = jobName;
+     this.jobFile = jobFile;
+     this.trackingUrl = trackingUrl;
+   }
+   
+  /**
+   * Sets the map progress of this job
+   * @param p The value of map progress to set to
+   */
+  protected synchronized void setMapProgress(float p) { 
+    this.mapProgress = (float) Math.min(1.0, Math.max(0.0, p)); 
+  }
+
+  /**
+   * Sets the cleanup progress of this job
+   * @param p The value of cleanup progress to set to
+   */
+  protected synchronized void setCleanupProgress(float p) { 
+    this.cleanupProgress = (float) Math.min(1.0, Math.max(0.0, p)); 
+  }
+
+  /**
+   * Sets the setup progress of this job
+   * @param p The value of setup progress to set to
+   */
+  protected synchronized void setSetupProgress(float p) { 
+    this.setupProgress = (float) Math.min(1.0, Math.max(0.0, p)); 
+  }
+
+  /**
+   * Sets the reduce progress of this Job
+   * @param p The value of reduce progress to set to
+   */
+  protected synchronized void setReduceProgress(float p) { 
+    this.reduceProgress = (float) Math.min(1.0, Math.max(0.0, p)); 
+  }
+    
+  /**
+   * Set the priority of the job, defaulting to NORMAL.
+   * @param jp new job priority
+   */
+  protected synchronized void setPriority(JobPriority jp) {
+    if (jp == null) {
+      throw new IllegalArgumentException("Job priority cannot be null.");
+    }
+    priority = jp;
+  }
+  
+  /** 
+   * Set the finish time of the job
+   * @param finishTime The finishTime of the job
+   */
+  protected synchronized void setFinishTime(long finishTime) {
+    this.finishTime = finishTime;
+  }
+
+  /**
+   * Set the job history file url for a completed job
+   */
+  protected synchronized void setHistoryFile(String historyFile) {
+    this.historyFile = historyFile;
+  }
+
+  /**
+   * Set the link to the web-ui for details of the job.
+   */
+  protected synchronized void setTrackingUrl(String trackingUrl) {
+    this.trackingUrl = trackingUrl;
+  }
+
+  /**
+   * Set the job retire flag to true.
+   */
+  protected synchronized void setRetired() {
+    this.isRetired = true;
+  }
+
+  /**
+   * Change the current run state of the job.
+   */
+  protected synchronized void setState(State state) {
+    this.runState = state;
+  }
+
+  /** 
+   * Set the start time of the job
+   * @param startTime The startTime of the job
+   */
+  protected synchronized void setStartTime(long startTime) { 
+    this.startTime = startTime;
+  }
+    
+  /**
+   * @param userName The username of the job
+   */
+  protected synchronized void setUsername(String userName) { 
+    this.user = userName;
+  }
+
+  /**
+   * Used to set the scheduling information associated to a particular Job.
+   * 
+   * @param schedulingInfo Scheduling information of the job
+   */
+  protected synchronized void setSchedulingInfo(String schedulingInfo) {
+    this.schedulingInfo = schedulingInfo;
+  }
+  
+  /**
+   * @return Percentage of progress in maps 
+   */
+  public synchronized float getMapProgress() { return mapProgress; }
+    
+  /**
+   * @return Percentage of progress in cleanup 
+   */
+  public synchronized float getCleanupProgress() { return cleanupProgress; }
+    
+  /**
+   * @return Percentage of progress in setup 
+   */
+  public synchronized float getSetupProgress() { return setupProgress; }
+    
+  /**
+   * @return Percentage of progress in reduce 
+   */
+  public synchronized float getReduceProgress() { return reduceProgress; }
+    
+  /**
+   * @return running state of the job
+   */
+  public synchronized State getState() { return runState; }
+    
+  /**
+   * @return start time of the job
+   */
+  synchronized public long getStartTime() { return startTime;}
+
+  @Override
+  public Object clone() {
+    try {
+      return super.clone();
+    } catch (CloneNotSupportedException cnse) {
+      // Shouldn't happen since we do implement Clonable
+      throw new InternalError(cnse.toString());
+    }
+  }
+  
+  /**
+   * @return The jobid of the Job
+   */
+  public JobID getJobID() { return jobid; }
+    
+  /**
+   * @return the username of the job
+   */
+  public synchronized String getUsername() { return this.user;}
+  
+  /**
+   * Gets the Scheduling information associated to a particular Job.
+   * @return the scheduling information of the job
+   */
+  public synchronized String getSchedulingInfo() {
+   return schedulingInfo;
+  }
+
+  /**
+   * Return the priority of the job
+   * @return job priority
+   */
+   public synchronized JobPriority getPriority() { return priority; }
+  
+  /**
+   * Returns true if the status is for a completed job.
+   */
+  public synchronized boolean isJobComplete() {
+    return (runState == JobStatus.State.SUCCEEDED || 
+            runState == JobStatus.State.FAILED || 
+            runState == JobStatus.State.KILLED);
+  }
+
+  ///////////////////////////////////////
+  // Writable
+  ///////////////////////////////////////
+  public synchronized void write(DataOutput out) throws IOException {
+    jobid.write(out);
+    out.writeFloat(setupProgress);
+    out.writeFloat(mapProgress);
+    out.writeFloat(reduceProgress);
+    out.writeFloat(cleanupProgress);
+    WritableUtils.writeEnum(out, runState);
+    out.writeLong(startTime);
+    Text.writeString(out, user);
+    WritableUtils.writeEnum(out, priority);
+    Text.writeString(out, schedulingInfo);
+    out.writeLong(finishTime);
+    out.writeBoolean(isRetired);
+    Text.writeString(out, historyFile);
+    Text.writeString(out, jobName);
+    Text.writeString(out, trackingUrl);
+    Text.writeString(out, jobFile);
+  }
+
+  public synchronized void readFields(DataInput in) throws IOException {
+    this.jobid = new JobID();
+    this.jobid.readFields(in);
+    this.setupProgress = in.readFloat();
+    this.mapProgress = in.readFloat();
+    this.reduceProgress = in.readFloat();
+    this.cleanupProgress = in.readFloat();
+    this.runState = WritableUtils.readEnum(in, State.class);
+    this.startTime = in.readLong();
+    this.user = Text.readString(in);
+    this.priority = WritableUtils.readEnum(in, JobPriority.class);
+    this.schedulingInfo = Text.readString(in);
+    this.finishTime = in.readLong();
+    this.isRetired = in.readBoolean();
+    this.historyFile = Text.readString(in);
+    this.jobName = Text.readString(in);
+    this.trackingUrl = Text.readString(in);
+    this.jobFile = Text.readString(in);
+  }
+
+  /**
+   * Get the user-specified job name.
+   */
+  public String getJobName() {
+    return jobName;
+  }
+
+  /**
+   * Get the configuration file for the job.
+   */
+  public String getJobFile() {
+    return jobFile;
+  }
+
+  /**
+   * Get the link to the web-ui for details of the job.
+   */
+  public synchronized String getTrackingUrl() {
+    return trackingUrl;
+  }
+
+  /**
+   * Get the finish time of the job.
+   */
+  public synchronized long getFinishTime() { 
+    return finishTime;
+  }
+
+  /**
+   * Check whether the job has retired.
+   */
+  public synchronized boolean isRetired() {
+    return isRetired;
+  }
+
+  /**
+   * @return the job history file name for a completed job. If job is not 
+   * completed or history file not available then return null.
+   */
+  public synchronized String getHistoryFile() {
+    return historyFile;
+  }
+
+  public String toString() {
+    StringBuffer buffer = new StringBuffer();
+    buffer.append("job-id : " + jobid);
+    buffer.append("map-progress : " + mapProgress);
+    buffer.append("reduce-progress : " + reduceProgress);
+    buffer.append("cleanup-progress : " + cleanupProgress);
+    buffer.append("setup-progress : " + setupProgress);
+    buffer.append("runstate : " + runState);
+    buffer.append("start-time : " + startTime);
+    buffer.append("user-name : " + user);
+    buffer.append("priority : " + priority);
+    buffer.append("scheduling-info : " + schedulingInfo);
+    return buffer.toString();
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=816496&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java Fri Sep 18 07:04:42 2009
@@ -0,0 +1,489 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.util.ReflectionUtils;
+
+class JobSubmitter {
+  protected static final Log LOG = LogFactory.getLog(JobSubmitter.class);
+  private FileSystem jtFs;
+  private ClientProtocol submitClient;
+  
+  JobSubmitter(FileSystem submitFs, ClientProtocol submitClient) 
+  throws IOException {
+    this.submitClient = submitClient;
+    this.jtFs = submitFs;
+  }
+  /*
+   * see if two file systems are the same or not.
+   */
+  private boolean compareFs(FileSystem srcFs, FileSystem destFs) {
+    URI srcUri = srcFs.getUri();
+    URI dstUri = destFs.getUri();
+    if (srcUri.getScheme() == null) {
+      return false;
+    }
+    if (!srcUri.getScheme().equals(dstUri.getScheme())) {
+      return false;
+    }
+    String srcHost = srcUri.getHost();    
+    String dstHost = dstUri.getHost();
+    if ((srcHost != null) && (dstHost != null)) {
+      try {
+        srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
+        dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
+      } catch(UnknownHostException ue) {
+        return false;
+      }
+      if (!srcHost.equals(dstHost)) {
+        return false;
+      }
+    } else if (srcHost == null && dstHost != null) {
+      return false;
+    } else if (srcHost != null && dstHost == null) {
+      return false;
+    }
+    //check for ports
+    if (srcUri.getPort() != dstUri.getPort()) {
+      return false;
+    }
+    return true;
+  }
+
+  // copies a file to the jobtracker filesystem and returns the path where it
+  // was copied to
+  private Path copyRemoteFiles(Path parentDir,
+      Path originalPath, Configuration conf, short replication) 
+      throws IOException {
+    //check if we do not need to copy the files
+    // is jt using the same file system.
+    // just checking for uri strings... doing no dns lookups 
+    // to see if the filesystems are the same. This is not optimal.
+    // but avoids name resolution.
+    
+    FileSystem remoteFs = null;
+    remoteFs = originalPath.getFileSystem(conf);
+    if (compareFs(remoteFs, jtFs)) {
+      return originalPath;
+    }
+    // this might have name collisions. copy will throw an exception
+    //parse the original path to create new path
+    Path newPath = new Path(parentDir, originalPath.getName());
+    FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, conf);
+    jtFs.setReplication(newPath, replication);
+    return newPath;
+  }
+
+  // configures -files, -libjars and -archives.
+  private void copyAndConfigureFiles(Job job, Path submitJobDir,
+      short replication) throws IOException {
+    Configuration conf = job.getConfiguration();
+    if (!(conf.getBoolean("mapred.used.genericoptionsparser", false))) {
+      LOG.warn("Use GenericOptionsParser for parsing the arguments. " +
+               "Applications should implement Tool for the same.");
+    }
+
+    // get all the command line arguments passed in by the user conf
+    String files = conf.get("tmpfiles");
+    String libjars = conf.get("tmpjars");
+    String archives = conf.get("tmparchives");
+      
+    /*
+     * set this user's id in job configuration, so later job files can be
+     * accessed using this user's id
+     */
+    job.setUGIAndUserGroupNames();
+
+    //
+    // Figure out what fs the JobTracker is using.  Copy the
+    // job to it, under a temporary name.  This allows DFS to work,
+    // and under the local fs also provides UNIX-like object loading 
+    // semantics.  (that is, if the job file is deleted right after
+    // submission, we can still run the submission to completion)
+    //
+
+    // Create a number of filenames in the JobTracker's fs namespace
+    LOG.debug("default FileSystem: " + jtFs.getUri());
+    jtFs.delete(submitJobDir, true);
+    submitJobDir = jtFs.makeQualified(submitJobDir);
+    submitJobDir = new Path(submitJobDir.toUri().getPath());
+    FsPermission mapredSysPerms = new FsPermission(JOB_DIR_PERMISSION);
+    FileSystem.mkdirs(jtFs, submitJobDir, mapredSysPerms);
+    Path filesDir = new Path(submitJobDir, "files");
+    Path archivesDir = new Path(submitJobDir, "archives");
+    Path libjarsDir = new Path(submitJobDir, "libjars");
+    // add all the command line files/ jars and archive
+    // first copy them to jobtrackers filesystem 
+      
+    if (files != null) {
+      FileSystem.mkdirs(jtFs, filesDir, mapredSysPerms);
+      String[] fileArr = files.split(",");
+      for (String tmpFile: fileArr) {
+        Path tmp = new Path(tmpFile);
+        Path newPath = copyRemoteFiles(filesDir, tmp, conf, replication);
+        try {
+          URI pathURI = new URI(newPath.toUri().toString() + "#" 
+                          + newPath.getName());
+          DistributedCache.addCacheFile(pathURI, conf);
+        } catch(URISyntaxException ue) {
+          //should not throw a uri exception 
+          throw new IOException("Failed to create uri for " + tmpFile);
+        }
+        DistributedCache.createSymlink(conf);
+      }
+    }
+      
+    if (libjars != null) {
+      FileSystem.mkdirs(jtFs, libjarsDir, mapredSysPerms);
+      String[] libjarsArr = libjars.split(",");
+      for (String tmpjars: libjarsArr) {
+        Path tmp = new Path(tmpjars);
+        Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, replication);
+        DistributedCache.addFileToClassPath(newPath, conf);
+      }
+    }
+      
+    if (archives != null) {
+      FileSystem.mkdirs(jtFs, archivesDir, mapredSysPerms); 
+      String[] archivesArr = archives.split(",");
+      for (String tmpArchives: archivesArr) {
+        Path tmp = new Path(tmpArchives);
+        Path newPath = copyRemoteFiles(archivesDir, tmp, conf,
+          replication);
+        try {
+          URI pathURI = new URI(newPath.toUri().toString() + "#" 
+                          + newPath.getName());
+          DistributedCache.addCacheArchive(pathURI, conf);
+        } catch(URISyntaxException ue) {
+          //should not throw an uri excpetion
+          throw new IOException("Failed to create uri for " + tmpArchives);
+        }
+        DistributedCache.createSymlink(conf);
+      }
+    }
+      
+    //  set the timestamps of the archives and files
+    TrackerDistributedCacheManager.determineTimestamps(conf);
+  }
+
+  private void copyJar(Path originalJarPath, Path submitJarFile,
+      short replication) throws IOException {
+    jtFs.copyFromLocalFile(originalJarPath, submitJarFile);
+    jtFs.setReplication(submitJarFile, replication);
+    jtFs.setPermission(submitJarFile, new FsPermission(JOB_FILE_PERMISSION));
+  }
+  /**
+   * configure the jobconf of the user with the command line options of 
+   * -libjars, -files, -archives.
+   * @param conf
+   * @throws IOException
+   */
+  private void configureCommandLineOptions(Job job, Path submitJobDir,
+      Path submitJarFile) throws IOException {
+    Configuration conf = job.getConfiguration();
+    short replication = (short)conf.getInt("mapred.submit.replication", 10);
+    copyAndConfigureFiles(job, submitJobDir, replication);
+    
+    /* set this user's id in job configuration, so later job files can be
+     * accessed using this user's id
+     */
+    String originalJarPath = job.getJar();
+
+    if (originalJarPath != null) {           // copy jar to JobTracker's fs
+      // use jar name if job is not named. 
+      if ("".equals(job.getJobName())){
+        job.setJobName(new Path(originalJarPath).getName());
+      }
+      job.setJar(submitJarFile.toString());
+      copyJar(new Path(originalJarPath), submitJarFile, replication);
+    } else {
+      LOG.warn("No job jar file set.  User classes may not be found. "+
+               "See Job or Job#setJar(String).");
+    }
+
+    // Set the working directory
+    if (job.getWorkingDirectory() == null) {
+      job.setWorkingDirectory(jtFs.getWorkingDirectory());          
+    }
+
+  }
+
+  // job files are world-wide readable and owner writable
+  final private static FsPermission JOB_FILE_PERMISSION = 
+    FsPermission.createImmutable((short) 0644); // rw-r--r--
+
+  // job submission directory is world readable/writable/executable
+  final static FsPermission JOB_DIR_PERMISSION =
+    FsPermission.createImmutable((short) 0777); // rwx-rwx-rwx
+   
+  /**
+   * Internal method for submitting jobs to the system.
+   * 
+   * <p>The job submission process involves:
+   * <ol>
+   *   <li>
+   *   Checking the input and output specifications of the job.
+   *   </li>
+   *   <li>
+   *   Computing the {@link InputSplit}s for the job.
+   *   </li>
+   *   <li>
+   *   Setup the requisite accounting information for the 
+   *   {@link DistributedCache} of the job, if necessary.
+   *   </li>
+   *   <li>
+   *   Copying the job's jar and configuration to the map-reduce system
+   *   directory on the distributed file-system. 
+   *   </li>
+   *   <li>
+   *   Submitting the job to the <code>JobTracker</code> and optionally
+   *   monitoring it's status.
+   *   </li>
+   * </ol></p>
+   * @param job the configuration to submit
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  JobStatus submitJobInternal(Job job) throws ClassNotFoundException,
+      InterruptedException, IOException {
+    
+    //configure the command line options correctly on the submitting dfs
+    Configuration conf = job.getConfiguration();
+    JobID jobId = submitClient.getNewJobID();
+    Path submitJobDir = new Path(submitClient.getSystemDir(), jobId.toString());
+    Path submitJarFile = new Path(submitJobDir, "job.jar");
+    Path submitSplitFile = new Path(submitJobDir, "job.split");
+    configureCommandLineOptions(job, submitJobDir, submitJarFile);
+    Path submitJobFile = new Path(submitJobDir, "job.xml");
+    
+    checkSpecs(job);
+
+    // Create the splits for the job
+    LOG.info("Creating splits at " + jtFs.makeQualified(submitSplitFile));
+    int maps = writeSplits(job, submitSplitFile);
+    conf.set("mapred.job.split.file", submitSplitFile.toString());
+    conf.setInt("mapred.map.tasks", maps);
+    LOG.info("number of splits:" + maps);
+    
+    // Write job file to JobTracker's fs
+    writeConf(conf, submitJobFile);
+    
+    //
+    // Now, actually submit the job (using the submit name)
+    //
+    JobStatus status = submitClient.submitJob(jobId);
+    if (status != null) {
+      return status;
+    } else {
+      throw new IOException("Could not launch job");
+    }
+  }
+
+  private void checkSpecs(Job job) throws ClassNotFoundException, 
+      InterruptedException, IOException {
+    JobConf jConf = (JobConf)job.getConfiguration();
+    // Check the output specification
+    if (jConf.getNumReduceTasks() == 0 ? 
+        jConf.getUseNewMapper() : jConf.getUseNewReducer()) {
+      org.apache.hadoop.mapreduce.OutputFormat<?, ?> output =
+        ReflectionUtils.newInstance(job.getOutputFormatClass(),
+          job.getConfiguration());
+      output.checkOutputSpecs(job);
+    } else {
+      jConf.getOutputFormat().checkOutputSpecs(jtFs, jConf);
+    }
+  }
+  
+  private void writeConf(Configuration conf, Path jobFile) 
+      throws IOException {
+    // Write job file to JobTracker's fs        
+    FSDataOutputStream out = 
+      FileSystem.create(jtFs, jobFile, 
+                        new FsPermission(JOB_FILE_PERMISSION));
+    try {
+      conf.writeXml(out);
+    } finally {
+      out.close();
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  private <T extends InputSplit> 
+  int writeNewSplits(JobContext job, Path submitSplitFile) throws IOException,
+      InterruptedException, ClassNotFoundException {
+    Configuration conf = job.getConfiguration();
+    InputFormat<?, ?> input =
+      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
+    
+    List<InputSplit> splits = input.getSplits(job);
+    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
+
+    // sort the splits into order based on size, so that the biggest
+    // go first
+    Arrays.sort(array, new SplitComparator());
+    DataOutputStream out = writeSplitsFileHeader(conf, submitSplitFile, 
+                                                 array.length);
+    try {
+      if (array.length != 0) {
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        Job.RawSplit rawSplit = new Job.RawSplit();
+        SerializationFactory factory = new SerializationFactory(conf);
+        Serializer<T> serializer = 
+          factory.getSerializer((Class<T>) array[0].getClass());
+        serializer.open(buffer);
+        for (T split: array) {
+          rawSplit.setClassName(split.getClass().getName());
+          buffer.reset();
+          serializer.serialize(split);
+          rawSplit.setDataLength(split.getLength());
+          rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
+          rawSplit.setLocations(split.getLocations());
+          rawSplit.write(out);
+        }
+        serializer.close();
+      }
+    } finally {
+      out.close();
+    }
+    return array.length;
+  }
+
+  static final int CURRENT_SPLIT_FILE_VERSION = 0;
+  static final byte[] SPLIT_FILE_HEADER = "SPL".getBytes();
+
+  private DataOutputStream writeSplitsFileHeader(Configuration conf,
+      Path filename, int length) throws IOException {
+    // write the splits to a file for the job tracker
+    FileSystem fs = filename.getFileSystem(conf);
+    FSDataOutputStream out = 
+      FileSystem.create(fs, filename, new FsPermission(JOB_FILE_PERMISSION));
+    out.write(SPLIT_FILE_HEADER);
+    WritableUtils.writeVInt(out, CURRENT_SPLIT_FILE_VERSION);
+    WritableUtils.writeVInt(out, length);
+    return out;
+  }
+
+  private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
+                  Path submitSplitFile) throws IOException,
+      InterruptedException, ClassNotFoundException {
+    JobConf jConf = (JobConf)job.getConfiguration();
+    // Create the splits for the job
+    LOG.debug("Creating splits at " + jtFs.makeQualified(submitSplitFile));
+    int maps;
+    if (jConf.getUseNewMapper()) {
+      maps = writeNewSplits(job, submitSplitFile);
+    } else {
+      maps = writeOldSplits(jConf, submitSplitFile);
+    }
+    return maps;
+  }
+
+  // method to write splits for old api mapper.
+  private int writeOldSplits(JobConf job, 
+      Path submitSplitFile) throws IOException {
+    org.apache.hadoop.mapred.InputSplit[] splits = 
+    job.getInputFormat().getSplits(job, job.getNumMapTasks());
+    // sort the splits into order based on size, so that the biggest
+    // go first
+    Arrays.sort(splits, new Comparator<org.apache.hadoop.mapred.InputSplit>() {
+      public int compare(org.apache.hadoop.mapred.InputSplit a,
+                         org.apache.hadoop.mapred.InputSplit b) {
+        try {
+          long left = a.getLength();
+          long right = b.getLength();
+          if (left == right) {
+            return 0;
+          } else if (left < right) {
+            return 1;
+          } else {
+            return -1;
+          }
+        } catch (IOException ie) {
+          throw new RuntimeException("Problem getting input split size", ie);
+        }
+      }
+    });
+    DataOutputStream out = writeSplitsFileHeader(job, submitSplitFile,
+      splits.length);
+
+    try {
+      DataOutputBuffer buffer = new DataOutputBuffer();
+      Job.RawSplit rawSplit = new Job.RawSplit();
+      for (org.apache.hadoop.mapred.InputSplit split: splits) {
+        rawSplit.setClassName(split.getClass().getName());
+        buffer.reset();
+        split.write(buffer);
+        rawSplit.setDataLength(split.getLength());
+        rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
+        rawSplit.setLocations(split.getLocations());
+        rawSplit.write(out);
+      }
+    } finally {
+      out.close();
+    }
+    return splits.length;
+  }
+  
+  private static class SplitComparator implements Comparator<InputSplit> {
+    @Override
+    public int compare(InputSplit o1, InputSplit o2) {
+      try {
+        long len1 = o1.getLength();
+        long len2 = o2.getLength();
+        if (len1 < len2) {
+          return 1;
+        } else if (len1 == len2) {
+          return 0;
+        } else {
+          return -1;
+        }
+      } catch (IOException ie) {
+        throw new RuntimeException("exception in compare", ie);
+      } catch (InterruptedException ie) {
+        throw new RuntimeException("exception in compare", ie);        
+      }
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/QueueAclsInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/QueueAclsInfo.java?rev=816496&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/QueueAclsInfo.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/QueueAclsInfo.java Fri Sep 18 07:04:42 2009
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ *  Class to encapsulate Queue ACLs for a particular
+ *  user.
+ * 
+ */
+public class QueueAclsInfo implements Writable {
+
+  private String queueName;
+  private String[] operations;
+  /**
+   * Default constructor for QueueAclsInfo.
+   * 
+   */
+  public QueueAclsInfo() {
+    
+  }
+
+  /**
+   * Construct a new QueueAclsInfo object using the queue name and the
+   * queue operations array
+   * 
+   * @param queueName Name of the job queue
+   * @param operations
+   */
+  public QueueAclsInfo(String queueName, String[] operations) {
+    this.queueName = queueName;
+    this.operations = operations;    
+  }
+
+  /**
+   * Get queue name.
+   * 
+   * @return name
+   */
+  public String getQueueName() {
+    return queueName;
+  }
+
+  protected void setQueueName(String queueName) {
+    this.queueName = queueName;
+  }
+
+  /**
+   * Get opearations allowed on queue.
+   * 
+   * @return array of String
+   */
+  public String[] getOperations() {
+    return operations;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    queueName = Text.readString(in);
+    operations = WritableUtils.readStringArray(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, queueName);
+    WritableUtils.writeStringArray(out, operations);
+  }
+}



Mime
View raw message