hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r885145 [15/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/ src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/ src/benchmarks/gridmix2/ src/benchmarks/gridmix2/sr...
Date Sat, 28 Nov 2009 20:26:22 GMT
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/IsolationRunner.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/IsolationRunner.java Sat Nov 28 20:26:01 2009
@@ -33,6 +33,7 @@
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JvmTask;
+import org.apache.hadoop.mapreduce.MRConfig;
 
 /**
  * IsolationRunner is intended to facilitate debugging by re-running a specific
@@ -40,7 +41,7 @@
  * Currently, it is limited to re-running map tasks.
  *
  * Users may coerce MapReduce to keep task files around by setting 
- * keep.failed.task.files.  See mapred_tutorial.xml for more documentation.
+ * mapreduce.task.files.preserve.failedtasks.  See mapred_tutorial.xml for more documentation.
  */
 public class IsolationRunner {
   private static final Log LOG = 
@@ -64,6 +65,10 @@
       LOG.info("Task " + taskId + " reporting shuffle error: " + message);
     }
 
+    public void fatalError(TaskAttemptID taskId, String msg) throws IOException {
+      LOG.info("Task " + taskId + " reporting fatal error: " + msg);
+    }
+
     public JvmTask getTask(JvmContext context) throws IOException {
       return null;
     }
@@ -149,21 +154,21 @@
       return false;
     }
     JobConf conf = new JobConf(new Path(jobFilename.toString()));
-    TaskAttemptID taskId = TaskAttemptID.forName(conf.get("mapred.task.id"));
+    TaskAttemptID taskId = TaskAttemptID.forName(conf.get(JobContext.TASK_ATTEMPT_ID));
     if (taskId == null) {
-      System.out.println("mapred.task.id not found in configuration;" + 
+      System.out.println("mapreduce.task.attempt.id not found in configuration;" + 
           " job.xml is not a task config");
     }
-    boolean isMap = conf.getBoolean("mapred.task.is.map", true);
+    boolean isMap = conf.getBoolean(JobContext.TASK_ISMAP, true);
     if (!isMap) {
       System.out.println("Only map tasks are supported.");
       return false;
     }
-    int partition = conf.getInt("mapred.task.partition", 0);
+    int partition = conf.getInt(JobContext.TASK_PARTITION, 0);
     
     // setup the local and user working directories
     FileSystem local = FileSystem.getLocal(conf);
-    LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+    LocalDirAllocator lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
 
     File workDirName = TaskRunner.formWorkDir(lDirAlloc, taskId, false, conf);
     local.setWorkingDirectory(new Path(workDirName.toString()));
@@ -179,9 +184,9 @@
     // any of the configured local disks, so use LocalDirAllocator to find out
     // where it is.
     Path localSplit =
-        new LocalDirAllocator("mapred.local.dir").getLocalPathToRead(
-            TaskTracker.getLocalSplitFile(taskId.getJobID().toString(), taskId
-                .toString()), conf);
+        new LocalDirAllocator(MRConfig.LOCAL_DIR).getLocalPathToRead(
+            TaskTracker.getLocalSplitFile(conf.getUser(), taskId.getJobID()
+                .toString(), taskId.toString()), conf);
     DataInputStream splitFile = FileSystem.getLocal(conf).open(localSplit);
     String splitClass = Text.readString(splitFile);
     BytesWritable split = new BytesWritable();

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JSPUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JSPUtil.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JSPUtil.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JSPUtil.java Sat Nov 28 20:26:01 2009
@@ -27,12 +27,15 @@
 
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+import javax.servlet.jsp.JspWriter;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.mapred.JobHistory.JobInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.util.ServletUtil;
 import org.apache.hadoop.util.StringUtils;
 
@@ -46,7 +49,7 @@
     new LinkedHashMap<String, JobInfo>(); 
 
   private static final int CACHE_SIZE = 
-    conf.getInt("mapred.job.tracker.jobhistory.lru.cache.size", 5);
+    conf.getInt(JTConfig.JT_JOBHISTORY_CACHE_SIZE, 5);
 
   private static final Log LOG = LogFactory.getLog(JSPUtil.class);
   /**
@@ -257,6 +260,43 @@
     return sb.toString();
   }
 
+  @SuppressWarnings("unchecked")
+  public static void generateRetiredJobXml(JspWriter out, JobTracker tracker, int rowId)
+      throws IOException {
+
+    Iterator<JobStatus> iterator =
+      tracker.retireJobs.getAll().descendingIterator();
+
+    for (int i = 0; i < 100 && iterator.hasNext(); i++) {
+      JobStatus status = iterator.next();
+      StringBuilder sb = new StringBuilder();
+      sb.append("<retired_job rowid=\"" + rowId + "\" jobid=\"" + status.getJobId() + "\">");
+      sb.append("<jobid>" + status.getJobId() + "</jobid>");
+      sb.append("<history_url>jobdetailshistory.jsp?jobid=" + status.getJobId()
+          + "&amp;logFile="
+          + URLEncoder.encode(status.getHistoryFile().toString(), "UTF-8")
+          + "</history_url>");
+      sb.append("<priority>" + status.getJobPriority().toString()
+          + "</priority>");
+      sb.append("<user>" + status.getUsername() + "</user>");
+      sb.append("<name>" + status.getJobName() + "</name>");
+      sb.append("<run_state>" + JobStatus.getJobRunState(status.getRunState())
+          + "</run_state>");
+      sb.append("<start_time>" + new Date(status.getStartTime())
+          + "</start_time>");
+      sb.append("<finish_time>" + new Date(status.getFinishTime())
+          + "</finish_time>");
+      sb.append("<map_complete>" + StringUtils.formatPercent(
+          status.mapProgress(), 2) + "</map_complete>");
+      sb.append("<reduce_complete>" + StringUtils.formatPercent(
+          status.reduceProgress(), 2) + "</reduce_complete>");
+      sb.append("<scheduling_info>" + status.getSchedulingInfo() + "</scheduling_info>");
+      sb.append("</retired_job>\n");
+      out.write(sb.toString());
+      rowId++;
+    }
+  }
+
   static final boolean privateActionsAllowed() {
     return conf.getBoolean(PRIVATE_ACTIONS_KEY, false);
   }
@@ -268,10 +308,10 @@
     synchronized(jobHistoryCache) {
       JobInfo jobInfo = jobHistoryCache.remove(jobid);
       if (jobInfo == null) {
-        jobInfo = new JobHistory.JobInfo(jobid);
+        JobHistoryParser parser = new JobHistoryParser(fs, logFile);
+        jobInfo = parser.parse();
         LOG.info("Loading Job History file "+jobid + ".   Cache size is " +
             jobHistoryCache.size());
-        DefaultJobHistoryParser.parseJobTasks( logFile, jobInfo, fs) ; 
       }
       jobHistoryCache.put(jobid, jobInfo);
       if (jobHistoryCache.size() > CACHE_SIZE) {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobClient.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobClient.java Sat Nov 28 20:26:01 2009
@@ -17,60 +17,28 @@
  */
 package org.apache.hadoop.mapred;
 
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.net.URL;
-import java.net.URLConnection;
-import java.net.UnknownHostException;
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Comparator;
 import java.util.List;
 
-import javax.security.auth.login.LoginException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
-import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
-import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.mapreduce.tools.CLI;
+import org.apache.hadoop.mapreduce.util.ConfigUtil;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
-import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.Counters.Group;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UnixUserGroupInformation;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -156,16 +124,15 @@
  * @see ClusterStatus
  * @see Tool
  * @see DistributedCache
+ * @deprecated Use {@link Job} and {@link Cluster} instead
  */
-public class JobClient extends Configured implements MRConstants, Tool  {
-  private static final Log LOG = LogFactory.getLog(JobClient.class);
+@Deprecated
+public class JobClient extends CLI {
   public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
   private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; 
-  private static final long MAX_JOBPROFILE_AGE = 1000 * 2;
 
   static{
-    Configuration.addDefaultResource("mapred-default.xml");
-    Configuration.addDefaultResource("mapred-site.xml");
+    ConfigUtil.loadResources();
   }
 
   /**
@@ -174,9 +141,7 @@
    * remote service to provide certain functionality.
    */
   class NetworkedJob implements RunningJob {
-    JobStatus status;
-    long statustime;
-
+    Job job;
     /**
      * We store a JobProfile and a timestamp for when we last
      * acquired the job profile.  If the job is null, then we cannot
@@ -184,66 +149,47 @@
      * has completely forgotten about the job.  (eg, 24 hours after the
      * job completes.)
      */
-    public NetworkedJob(JobStatus job) throws IOException {
-      this.status = job;
-      this.statustime = System.currentTimeMillis();
+    public NetworkedJob(JobStatus status) throws IOException {
+      job = Job.getInstance(cluster, status, new JobConf(status.getJobFile()));
     }
 
-    /**
-     * Some methods rely on having a recent job profile object.  Refresh
-     * it, if necessary
-     */
-    synchronized void ensureFreshStatus() throws IOException {
-      if (System.currentTimeMillis() - statustime > MAX_JOBPROFILE_AGE) {
-        updateStatus();
-      }
-    }
-    
-    /** Some methods need to update status immediately. So, refresh
-     * immediately
-     * @throws IOException
-     */
-    synchronized void updateStatus() throws IOException {
-      this.status = jobSubmitClient.getJobStatus(status.getJobID());
-      if (this.status == null) {
-        throw new IOException("Job status not available ");
-      }
-      this.statustime = System.currentTimeMillis();
+    public NetworkedJob(Job job) throws IOException {
+      this.job = job;
     }
 
     /**
      * An identifier for the job
      */
     public JobID getID() {
-      return status.getJobID();
+      return JobID.downgrade(job.getID());
     }
     
     /** @deprecated This method is deprecated and will be removed. Applications should 
      * rather use {@link #getID()}.*/
     @Deprecated
     public String getJobID() {
-      return status.getJobID().toString();
+      return getID().toString();
     }
     
     /**
      * The user-specified job name
      */
     public String getJobName() {
-      return status.getJobName();
+      return job.getJobName();
     }
 
     /**
      * The name of the job file
      */
     public String getJobFile() {
-      return status.getJobFile();
+      return job.getJobFile();
     }
 
     /**
      * A URL where the job's status can be seen
      */
     public String getTrackingURL() {
-      return status.getTrackingUrl();
+      return job.getTrackingURL();
     }
 
     /**
@@ -251,8 +197,11 @@
      * completed.
      */
     public float mapProgress() throws IOException {
-      ensureFreshStatus();
-      return status.mapProgress();
+      try {
+        return job.mapProgress();
+      } catch (InterruptedException ie) {
+        throw new IOException(ie);
+      }
     }
 
     /**
@@ -260,8 +209,11 @@
      * completed.
      */
     public float reduceProgress() throws IOException {
-      ensureFreshStatus();
-      return status.reduceProgress();
+      try {
+        return job.reduceProgress();
+      } catch (InterruptedException ie) {
+        throw new IOException(ie);
+      }
     }
 
     /**
@@ -269,8 +221,11 @@
      * completed.
      */
     public float cleanupProgress() throws IOException {
-      ensureFreshStatus();
-      return status.cleanupProgress();
+      try {
+        return job.cleanupProgress();
+      } catch (InterruptedException ie) {
+        throw new IOException(ie);
+      }
     }
 
     /**
@@ -278,37 +233,45 @@
      * completed.
      */
     public float setupProgress() throws IOException {
-      ensureFreshStatus();
-      return status.setupProgress();
+      try {
+        return job.setupProgress();
+      } catch (InterruptedException ie) {
+        throw new IOException(ie);
+      }
     }
 
     /**
      * Returns immediately whether the whole job is done yet or not.
      */
     public synchronized boolean isComplete() throws IOException {
-      updateStatus();
-      return (status.getRunState() == JobStatus.SUCCEEDED ||
-              status.getRunState() == JobStatus.FAILED ||
-              status.getRunState() == JobStatus.KILLED);
+      try {
+        return job.isComplete();
+      } catch (InterruptedException ie) {
+        throw new IOException(ie);
+      }
     }
 
     /**
      * True iff job completed successfully.
      */
     public synchronized boolean isSuccessful() throws IOException {
-      updateStatus();
-      return status.getRunState() == JobStatus.SUCCEEDED;
+      try {
+        return job.isSuccessful();
+      } catch (InterruptedException ie) {
+        throw new IOException(ie);
+      }
     }
 
     /**
      * Blocks until the job is finished
      */
     public void waitForCompletion() throws IOException {
-      while (!isComplete()) {
-        try {
-          Thread.sleep(5000);
-        } catch (InterruptedException ie) {
-        }
+      try {
+        job.waitForCompletion(false);
+      } catch (InterruptedException ie) {
+        throw new IOException(ie);
+      } catch (ClassNotFoundException ce) {
+        throw new IOException(ce);
       }
     }
 
@@ -316,15 +279,22 @@
      * Tells the service to get the state of the current job.
      */
     public synchronized int getJobState() throws IOException {
-      updateStatus();
-      return status.getRunState();
+      try {
+        return job.getJobState().getValue();
+      } catch (InterruptedException ie) {
+        throw new IOException(ie);
+      }
     }
     
     /**
      * Tells the service to terminate the current job.
      */
     public synchronized void killJob() throws IOException {
-      jobSubmitClient.killJob(getID());
+      try {
+        job.killJob();
+      } catch (InterruptedException ie) {
+        throw new IOException(ie);
+      }
     }
    
     
@@ -333,7 +303,12 @@
     */
     public synchronized void setJobPriority(String priority) 
                                                 throws IOException {
-      jobSubmitClient.setJobPriority(getID(), priority);
+      try {
+        job.setPriority(
+          org.apache.hadoop.mapreduce.JobPriority.valueOf(priority));
+      } catch (InterruptedException ie) {
+        throw new IOException(ie);
+      }
     }
     
     /**
@@ -342,8 +317,17 @@
      * @param shouldFail if true the task is failed and added to failed tasks list, otherwise
      * it is just killed, w/o affecting job failure status.
      */
-    public synchronized void killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException {
-      jobSubmitClient.killTask(taskId, shouldFail);
+    public synchronized void killTask(TaskAttemptID taskId,
+        boolean shouldFail) throws IOException {
+      try {
+        if (shouldFail) {
+          job.failTask(taskId);
+        } else {
+          job.killTask(taskId);
+        }
+      } catch (InterruptedException ie) {
+        throw new IOException(ie);
+      }
     }
 
     /** @deprecated Applications should rather use {@link #killTask(TaskAttemptID, boolean)}*/
@@ -356,9 +340,18 @@
      * Fetch task completion events from jobtracker for this job. 
      */
     public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
-                                                                      int startFrom) throws IOException{
-      return jobSubmitClient.getTaskCompletionEvents(
-                                                     getID(), startFrom, 10); 
+        int startFrom) throws IOException {
+      try {
+        org.apache.hadoop.mapreduce.TaskCompletionEvent[] acls = 
+          job.getTaskCompletionEvents(startFrom, 10);
+        TaskCompletionEvent[] ret = new TaskCompletionEvent[acls.length];
+        for (int i = 0 ; i < acls.length; i++ ) {
+          ret[i] = TaskCompletionEvent.downgrade(acls[i]);
+        }
+        return ret;
+      } catch (InterruptedException ie) {
+        throw new IOException(ie);
+      }
     }
 
     /**
@@ -366,48 +359,52 @@
      */
     @Override
     public String toString() {
-      try {
-        updateStatus();
-      } catch (IOException e) {
-      }
-      return "Job: " + status.getJobID() + "\n" +
-        "status: " + JobStatus.getJobRunState(status.getRunState()) + "\n" + 
-        "file: " + status.getJobFile() + "\n" + 
-        "tracking URL: " + status.getTrackingUrl() + "\n" + 
-        "map() completion: " + status.mapProgress() + "\n" + 
-        "reduce() completion: " + status.reduceProgress() + "\n" +
-        "history URL: " + status.getHistoryFile() + "\n" +
-        "retired: " + status.isRetired();
+      return job.toString();
     }
         
     /**
      * Returns the counters for this job
      */
     public Counters getCounters() throws IOException {
-      return jobSubmitClient.getJobCounters(getID());
+      try { 
+        return Counters.downgrade(job.getCounters());
+      } catch (InterruptedException ie) {
+        throw new IOException(ie);
+      }
     }
     
     @Override
     public String[] getTaskDiagnostics(TaskAttemptID id) throws IOException {
-      return jobSubmitClient.getTaskDiagnostics(id);
+      try { 
+        return job.getTaskDiagnostics(id);
+      } catch (InterruptedException ie) {
+        throw new IOException(ie);
+      }
     }
 
     public String getHistoryUrl() throws IOException {
-      updateStatus();
-      return status.getHistoryFile();
+      try {
+        return job.getHistoryUrl();
+      } catch (InterruptedException ie) {
+        throw new IOException(ie);
+      }
     }
 
     public boolean isRetired() throws IOException {
-      updateStatus();
-      return status.isRetired();
+      try {
+        return job.isRetired();
+      } catch (InterruptedException ie) {
+        throw new IOException(ie);
+      }
+    }
+    
+    boolean monitorAndPrintJob() throws IOException, InterruptedException {
+      return job.monitorAndPrintJob();
     }
   }
 
-  private JobSubmissionProtocol jobSubmitClient;
-  private Path sysDir = null;
+  Cluster cluster;
   
-  private FileSystem fs = null;
-
   /**
    * Create a job client.
    */
@@ -422,7 +419,6 @@
    * @throws IOException
    */
   public JobClient(JobConf conf) throws IOException {
-    setConf(conf);
     init(conf);
   }
 
@@ -434,7 +430,6 @@
    * @throws IOException
    */
   public JobClient(Configuration conf) throws IOException {
-    setConf(conf);
     init(new JobConf(conf));
   }
 
@@ -444,19 +439,8 @@
    * @throws IOException
    */
   public void init(JobConf conf) throws IOException {
-    String tracker = conf.get("mapred.job.tracker", "local");
-    if ("local".equals(tracker)) {
-      this.jobSubmitClient = new LocalJobRunner(conf);
-    } else {
-      this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
-    }        
-  }
-
-  private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
-      Configuration conf) throws IOException {
-    return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
-        JobSubmissionProtocol.versionID, addr, getUGI(conf), conf,
-        NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
+    setConf(conf);
+    cluster = new Cluster(conf);
   }
 
   /**
@@ -467,16 +451,14 @@
    */
   public JobClient(InetSocketAddress jobTrackAddr, 
                    Configuration conf) throws IOException {
-    jobSubmitClient = createRPCProxy(jobTrackAddr, conf);
+    cluster = new Cluster(jobTrackAddr, conf);
   }
 
   /**
    * Close the <code>JobClient</code>.
    */
   public synchronized void close() throws IOException {
-    if (!(jobSubmitClient instanceof LocalJobRunner)) {
-      RPC.stopProxy(jobSubmitClient);
-    }
+    cluster.close();
   }
 
   /**
@@ -486,208 +468,11 @@
    * @return the filesystem handle.
    */
   public synchronized FileSystem getFs() throws IOException {
-    if (this.fs == null) {
-      Path sysDir = getSystemDir();
-      this.fs = sysDir.getFileSystem(getConf());
-    }
-    return fs;
-  }
-  
-  /* see if two file systems are the same or not
-   *
-   */
-  private boolean compareFs(FileSystem srcFs, FileSystem destFs) {
-    URI srcUri = srcFs.getUri();
-    URI dstUri = destFs.getUri();
-    if (srcUri.getScheme() == null) {
-      return false;
-    }
-    if (!srcUri.getScheme().equals(dstUri.getScheme())) {
-      return false;
-    }
-    String srcHost = srcUri.getHost();    
-    String dstHost = dstUri.getHost();
-    if ((srcHost != null) && (dstHost != null)) {
-      try {
-        srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
-        dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
-      } catch(UnknownHostException ue) {
-        return false;
-      }
-      if (!srcHost.equals(dstHost)) {
-        return false;
-      }
-    }
-    else if (srcHost == null && dstHost != null) {
-      return false;
-    }
-    else if (srcHost != null && dstHost == null) {
-      return false;
-    }
-    //check for ports
-    if (srcUri.getPort() != dstUri.getPort()) {
-      return false;
-    }
-    return true;
-  }
-
-  // copies a file to the jobtracker filesystem and returns the path where it
-  // was copied to
-  private Path copyRemoteFiles(FileSystem jtFs, Path parentDir, Path originalPath, 
-                               JobConf job, short replication) throws IOException {
-    //check if we do not need to copy the files
-    // is jt using the same file system.
-    // just checking for uri strings... doing no dns lookups 
-    // to see if the filesystems are the same. This is not optimal.
-    // but avoids name resolution.
-    
-    FileSystem remoteFs = null;
-    remoteFs = originalPath.getFileSystem(job);
-    if (compareFs(remoteFs, jtFs)) {
-      return originalPath;
-    }
-    // this might have name collisions. copy will throw an exception
-    //parse the original path to create new path
-    Path newPath = new Path(parentDir, originalPath.getName());
-    FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, job);
-    jtFs.setReplication(newPath, replication);
-    return newPath;
-  }
- 
-  /**
-   * configure the jobconf of the user with the command line options of 
-   * -libjars, -files, -archives
-   * @param conf
-   * @throws IOException
-   */
-  private void configureCommandLineOptions(JobConf job, Path submitJobDir, Path submitJarFile) 
-    throws IOException {
-    
-    if (!(job.getBoolean("mapred.used.genericoptionsparser", false))) {
-      LOG.warn("Use GenericOptionsParser for parsing the arguments. " +
-               "Applications should implement Tool for the same.");
-    }
-
-    // Retrieve command line arguments placed into the JobConf
-    // by GenericOptionsParser.
-    String files = job.get("tmpfiles");
-    String libjars = job.get("tmpjars");
-    String archives = job.get("tmparchives");
-
-    /*
-     * set this user's id in job configuration, so later job files can be
-     * accessed using this user's id
-     */
-    UnixUserGroupInformation ugi = getUGI(job);
-      
-    //
-    // Figure out what fs the JobTracker is using.  Copy the
-    // job to it, under a temporary name.  This allows DFS to work,
-    // and under the local fs also provides UNIX-like object loading 
-    // semantics.  (that is, if the job file is deleted right after
-    // submission, we can still run the submission to completion)
-    //
-
-    // Create a number of filenames in the JobTracker's fs namespace
-    FileSystem fs = getFs();
-    LOG.debug("default FileSystem: " + fs.getUri());
-    fs.delete(submitJobDir, true);
-    submitJobDir = fs.makeQualified(submitJobDir);
-    submitJobDir = new Path(submitJobDir.toUri().getPath());
-    FsPermission mapredSysPerms = new FsPermission(JOB_DIR_PERMISSION);
-    FileSystem.mkdirs(fs, submitJobDir, mapredSysPerms);
-    Path filesDir = new Path(submitJobDir, "files");
-    Path archivesDir = new Path(submitJobDir, "archives");
-    Path libjarsDir = new Path(submitJobDir, "libjars");
-    short replication = (short)job.getInt("mapred.submit.replication", 10);
-    // add all the command line files/ jars and archive
-    // first copy them to jobtrackers filesystem 
-    
-    if (files != null) {
-      FileSystem.mkdirs(fs, filesDir, mapredSysPerms);
-      String[] fileArr = files.split(",");
-      for (String tmpFile: fileArr) {
-        Path tmp = new Path(tmpFile);
-        Path newPath = copyRemoteFiles(fs,filesDir, tmp, job, replication);
-        try {
-          URI pathURI = new URI(newPath.toUri().toString() + "#" + newPath.getName());
-          DistributedCache.addCacheFile(pathURI, job);
-        } catch(URISyntaxException ue) {
-          //should not throw a uri exception 
-          throw new IOException("Failed to create uri for " + tmpFile);
-        }
-        DistributedCache.createSymlink(job);
-      }
-    }
-    
-    if (libjars != null) {
-      FileSystem.mkdirs(fs, libjarsDir, mapredSysPerms);
-      String[] libjarsArr = libjars.split(",");
-      for (String tmpjars: libjarsArr) {
-        Path tmp = new Path(tmpjars);
-        Path newPath = copyRemoteFiles(fs, libjarsDir, tmp, job, replication);
-        DistributedCache.addFileToClassPath(newPath, job);
-      }
-    }
-    
-    
-    if (archives != null) {
-     FileSystem.mkdirs(fs, archivesDir, mapredSysPerms); 
-     String[] archivesArr = archives.split(",");
-     for (String tmpArchives: archivesArr) {
-       Path tmp = new Path(tmpArchives);
-       Path newPath = copyRemoteFiles(fs, archivesDir, tmp, job, replication);
-       try {
-         URI pathURI = new URI(newPath.toUri().toString() + "#" + newPath.getName());
-         DistributedCache.addCacheArchive(pathURI, job);
-       } catch(URISyntaxException ue) {
-         //should not throw an uri excpetion
-         throw new IOException("Failed to create uri for " + tmpArchives);
-       }
-       DistributedCache.createSymlink(job);
-     }
-    }
-    
-    //  set the timestamps of the archives and files
-    TrackerDistributedCacheManager.determineTimestamps(job);
-       
-    String originalJarPath = job.getJar();
-
-    if (originalJarPath != null) {           // copy jar to JobTracker's fs
-      // use jar name if job is not named. 
-      if ("".equals(job.getJobName())){
-        job.setJobName(new Path(originalJarPath).getName());
-      }
-      job.setJar(submitJarFile.toString());
-      fs.copyFromLocalFile(new Path(originalJarPath), submitJarFile);
-      fs.setReplication(submitJarFile, replication);
-      fs.setPermission(submitJarFile, new FsPermission(JOB_FILE_PERMISSION));
-    } else {
-      LOG.warn("No job jar file set.  User classes may not be found. "+
-               "See JobConf(Class) or JobConf#setJar(String).");
-    }
-
-    // Set the user's name and working directory
-    job.setUser(ugi.getUserName());
-    if (ugi.getGroupNames().length > 0) {
-      job.set("group.name", ugi.getGroupNames()[0]);
-    }
-    if (job.getWorkingDirectory() == null) {
-      job.setWorkingDirectory(fs.getWorkingDirectory());          
-    }
-
-  }
-
-
-  private UnixUserGroupInformation getUGI(Configuration job) throws IOException {
-    UnixUserGroupInformation ugi = null;
-    try {
-      ugi = UnixUserGroupInformation.login(job, true);
-    } catch (LoginException e) {
-      throw (IOException)(new IOException(
-          "Failed to get the current user's information.").initCause(e));
+    try { 
+      return cluster.getFileSystem();
+    } catch (InterruptedException ie) {
+      throw new IOException(ie);
     }
-    return ugi;
   }
   
   /**
@@ -711,29 +496,26 @@
     return submitJob(job);
   }
     
-  // job files are world-wide readable and owner writable
-  final private static FsPermission JOB_FILE_PERMISSION = 
-    FsPermission.createImmutable((short) 0644); // rw-r--r--
-
-  // job submission directory is world readable/writable/executable
-  final static FsPermission JOB_DIR_PERMISSION =
-    FsPermission.createImmutable((short) 0777); // rwx-rwx-rwx
-   
   /**
    * Submit a job to the MR system.
    * This returns a handle to the {@link RunningJob} which can be used to track
    * the running-job.
    * 
-   * @param job the job configuration.
+   * @param conf the job configuration.
    * @return a handle to the {@link RunningJob} which can be used to track the
    *         running-job.
    * @throws FileNotFoundException
    * @throws IOException
    */
-  public RunningJob submitJob(JobConf job) throws FileNotFoundException,
+  public RunningJob submitJob(JobConf conf) throws FileNotFoundException,
                                                   IOException {
     try {
-      return submitJobInternal(job);
+      conf.setBooleanIfUnset("mapred.mapper.new-api", false);
+      conf.setBooleanIfUnset("mapred.reducer.new-api", false);
+      Job job = Job.getInstance(cluster, conf);
+      job.submit();
+      conf.setUser(job.getUser());
+      return new NetworkedJob(job);
     } catch (InterruptedException ie) {
       throw new IOException("interrupted", ie);
     } catch (ClassNotFoundException cnfe) {
@@ -741,187 +523,6 @@
     }
   }
 
-  /**
-   * Internal method for submitting jobs to the system.
-   * @param job the configuration to submit
-   * @return a proxy object for the running job
-   * @throws FileNotFoundException
-   * @throws ClassNotFoundException
-   * @throws InterruptedException
-   * @throws IOException
-   */
-  public 
-  RunningJob submitJobInternal(JobConf job
-                               ) throws FileNotFoundException, 
-                                        ClassNotFoundException,
-                                        InterruptedException,
-                                        IOException {
-    /*
-     * configure the command line options correctly on the submitting dfs
-     */
-    
-    JobID jobId = jobSubmitClient.getNewJobId();
-    Path submitJobDir = new Path(getSystemDir(), jobId.toString());
-    Path submitJarFile = new Path(submitJobDir, "job.jar");
-    Path submitSplitFile = new Path(submitJobDir, "job.split");
-    configureCommandLineOptions(job, submitJobDir, submitJarFile);
-    Path submitJobFile = new Path(submitJobDir, "job.xml");
-    int reduces = job.getNumReduceTasks();
-    JobContext context = new JobContext(job, jobId);
-    
-    // Check the output specification
-    if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) {
-      org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
-        ReflectionUtils.newInstance(context.getOutputFormatClass(), job);
-      output.checkOutputSpecs(context);
-    } else {
-      job.getOutputFormat().checkOutputSpecs(getFs(), job);
-    }
-
-    // Create the splits for the job
-    LOG.debug("Creating splits at " + getFs().makeQualified(submitSplitFile));
-    int maps;
-    if (job.getUseNewMapper()) {
-      maps = writeNewSplits(context, submitSplitFile);
-    } else {
-      maps = writeOldSplits(job, submitSplitFile);
-    }
-    job.set("mapred.job.split.file", submitSplitFile.toString());
-    job.setNumMapTasks(maps);
-        
-    // Write job file to JobTracker's fs        
-    FSDataOutputStream out = 
-      FileSystem.create(getFs(), submitJobFile,
-                        new FsPermission(JOB_FILE_PERMISSION));
-
-    try {
-      job.writeXml(out);
-    } finally {
-      out.close();
-    }
-
-    //
-    // Now, actually submit the job (using the submit name)
-    //
-    JobStatus status = jobSubmitClient.submitJob(jobId);
-    if (status != null) {
-      return new NetworkedJob(status);
-    } else {
-      throw new IOException("Could not launch job");
-    }
-  }
-
-  private int writeOldSplits(JobConf job, 
-                             Path submitSplitFile) throws IOException {
-    InputSplit[] splits = 
-      job.getInputFormat().getSplits(job, job.getNumMapTasks());
-    // sort the splits into order based on size, so that the biggest
-    // go first
-    Arrays.sort(splits, new Comparator<InputSplit>() {
-      public int compare(InputSplit a, InputSplit b) {
-        try {
-          long left = a.getLength();
-          long right = b.getLength();
-          if (left == right) {
-            return 0;
-          } else if (left < right) {
-            return 1;
-          } else {
-            return -1;
-          }
-        } catch (IOException ie) {
-          throw new RuntimeException("Problem getting input split size",
-                                     ie);
-        }
-      }
-    });
-    DataOutputStream out = writeSplitsFileHeader(job, submitSplitFile, splits.length);
-    
-    try {
-      DataOutputBuffer buffer = new DataOutputBuffer();
-      RawSplit rawSplit = new RawSplit();
-      for(InputSplit split: splits) {
-        rawSplit.setClassName(split.getClass().getName());
-        buffer.reset();
-        split.write(buffer);
-        rawSplit.setDataLength(split.getLength());
-        rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
-        rawSplit.setLocations(split.getLocations());
-        rawSplit.write(out);
-      }
-    } finally {
-      out.close();
-    }
-    return splits.length;
-  }
-
-  private static class NewSplitComparator 
-    implements Comparator<org.apache.hadoop.mapreduce.InputSplit>{
-
-    @Override
-    public int compare(org.apache.hadoop.mapreduce.InputSplit o1,
-                       org.apache.hadoop.mapreduce.InputSplit o2) {
-      try {
-        long len1 = o1.getLength();
-        long len2 = o2.getLength();
-        if (len1 < len2) {
-          return 1;
-        } else if (len1 == len2) {
-          return 0;
-        } else {
-          return -1;
-        }
-      } catch (IOException ie) {
-        throw new RuntimeException("exception in compare", ie);
-      } catch (InterruptedException ie) {
-        throw new RuntimeException("exception in compare", ie);        
-      }
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  private <T extends org.apache.hadoop.mapreduce.InputSplit> 
-  int writeNewSplits(JobContext job, Path submitSplitFile
-                     ) throws IOException, InterruptedException, 
-                              ClassNotFoundException {
-    JobConf conf = job.getJobConf();
-    org.apache.hadoop.mapreduce.InputFormat<?,?> input =
-      ReflectionUtils.newInstance(job.getInputFormatClass(), job.getJobConf());
-    
-    List<org.apache.hadoop.mapreduce.InputSplit> splits = input.getSplits(job);
-    T[] array = (T[])
-      splits.toArray(new org.apache.hadoop.mapreduce.InputSplit[splits.size()]);
-
-    // sort the splits into order based on size, so that the biggest
-    // go first
-    Arrays.sort(array, new NewSplitComparator());
-    DataOutputStream out = writeSplitsFileHeader(conf, submitSplitFile, 
-                                                 array.length);
-    try {
-      if (array.length != 0) {
-        DataOutputBuffer buffer = new DataOutputBuffer();
-        RawSplit rawSplit = new RawSplit();
-        SerializationFactory factory = new SerializationFactory(conf);
-        Serializer<T> serializer = 
-          factory.getSerializer((Class<T>) array[0].getClass());
-        serializer.open(buffer);
-        for(T split: array) {
-          rawSplit.setClassName(split.getClass().getName());
-          buffer.reset();
-          serializer.serialize(split);
-          rawSplit.setDataLength(split.getLength());
-          rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
-          rawSplit.setLocations(split.getLocations());
-          rawSplit.write(out);
-        }
-        serializer.close();
-      }
-    } finally {
-      out.close();
-    }
-    return array.length;
-  }
-
   /** 
    * Checks if the job directory is clean and has all the required components 
    * for (re) starting the job
@@ -953,125 +554,6 @@
     return false;
   }
 
-  static class RawSplit implements Writable {
-    private String splitClass;
-    private BytesWritable bytes = new BytesWritable();
-    private String[] locations;
-    long dataLength;
-
-    public void setBytes(byte[] data, int offset, int length) {
-      bytes.set(data, offset, length);
-    }
-
-    public void setClassName(String className) {
-      splitClass = className;
-    }
-      
-    public String getClassName() {
-      return splitClass;
-    }
-      
-    public BytesWritable getBytes() {
-      return bytes;
-    }
-
-    public void clearBytes() {
-      bytes = null;
-    }
-      
-    public void setLocations(String[] locations) {
-      this.locations = locations;
-    }
-      
-    public String[] getLocations() {
-      return locations;
-    }
-      
-    public void readFields(DataInput in) throws IOException {
-      splitClass = Text.readString(in);
-      dataLength = in.readLong();
-      bytes.readFields(in);
-      int len = WritableUtils.readVInt(in);
-      locations = new String[len];
-      for(int i=0; i < len; ++i) {
-        locations[i] = Text.readString(in);
-      }
-    }
-      
-    public void write(DataOutput out) throws IOException {
-      Text.writeString(out, splitClass);
-      out.writeLong(dataLength);
-      bytes.write(out);
-      WritableUtils.writeVInt(out, locations.length);
-      for(int i = 0; i < locations.length; i++) {
-        Text.writeString(out, locations[i]);
-      }        
-    }
-
-    public long getDataLength() {
-      return dataLength;
-    }
-    public void setDataLength(long l) {
-      dataLength = l;
-    }
-    
-  }
-    
-  private static final int CURRENT_SPLIT_FILE_VERSION = 0;
-  private static final byte[] SPLIT_FILE_HEADER = "SPL".getBytes();
-
-  private DataOutputStream writeSplitsFileHeader(Configuration conf,
-                                                 Path filename,
-                                                 int length
-                                                 ) throws IOException {
-    // write the splits to a file for the job tracker
-    FileSystem fs = filename.getFileSystem(conf);
-    FSDataOutputStream out = 
-      FileSystem.create(fs, filename, new FsPermission(JOB_FILE_PERMISSION));
-    out.write(SPLIT_FILE_HEADER);
-    WritableUtils.writeVInt(out, CURRENT_SPLIT_FILE_VERSION);
-    WritableUtils.writeVInt(out, length);
-    return out;
-  }
-
-  /** Create the list of input splits and write them out in a file for
-   *the JobTracker. The format is:
-   * <format version>
-   * <numSplits>
-   * for each split:
-   *    <RawSplit>
-   * @param splits the input splits to write out
-   * @param out the stream to write to
-   */
-  private void writeOldSplitsFile(InputSplit[] splits, 
-                                  FSDataOutputStream out) throws IOException {
-  }
-
-  /**
-   * Read a splits file into a list of raw splits
-   * @param in the stream to read from
-   * @return the complete list of splits
-   * @throws IOException
-   */
-  static RawSplit[] readSplitFile(DataInput in) throws IOException {
-    byte[] header = new byte[SPLIT_FILE_HEADER.length];
-    in.readFully(header);
-    if (!Arrays.equals(SPLIT_FILE_HEADER, header)) {
-      throw new IOException("Invalid header on split file");
-    }
-    int vers = WritableUtils.readVInt(in);
-    if (vers != CURRENT_SPLIT_FILE_VERSION) {
-      throw new IOException("Unsupported split version " + vers);
-    }
-    int len = WritableUtils.readVInt(in);
-    RawSplit[] result = new RawSplit[len];
-    for(int i=0; i < len; ++i) {
-      result[i] = new RawSplit();
-      result[i].readFields(in);
-    }
-    return result;
-  }
-    
   /**
    * Get an {@link RunningJob} object to track an ongoing job.  Returns
    * null if the id does not correspond to any known job.
@@ -1082,12 +564,18 @@
    * @throws IOException
    */
   public RunningJob getJob(JobID jobid) throws IOException {
-    JobStatus status = jobSubmitClient.getJobStatus(jobid);
-    if (status != null) {
-      return new NetworkedJob(status);
-    } else {
-      return null;
+    try {
+      Job job = cluster.getJob(jobid);
+      if (job != null) {
+        JobStatus status = JobStatus.downgrade(job.getStatus());
+        if (status != null) {
+          return new NetworkedJob(status);
+        } 
+      }
+    } catch (InterruptedException ie) {
+      throw new IOException(ie);
     }
+    return null;
   }
 
   /**@deprecated Applications should rather use {@link #getJob(JobID)}. 
@@ -1105,7 +593,12 @@
    * @throws IOException
    */
   public TaskReport[] getMapTaskReports(JobID jobId) throws IOException {
-    return jobSubmitClient.getMapTaskReports(jobId);
+    try {
+      return TaskReport.downgradeArray(
+        cluster.getJob(jobId).getTaskReports(TaskType.MAP));
+    } catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
   }
   
   /**@deprecated Applications should rather use {@link #getMapTaskReports(JobID)}*/
@@ -1122,7 +615,12 @@
    * @throws IOException
    */    
   public TaskReport[] getReduceTaskReports(JobID jobId) throws IOException {
-    return jobSubmitClient.getReduceTaskReports(jobId);
+    try {
+      return TaskReport.downgradeArray(
+        cluster.getJob(jobId).getTaskReports(TaskType.REDUCE));
+    } catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
   }
 
   /**
@@ -1133,7 +631,12 @@
    * @throws IOException
    */    
   public TaskReport[] getCleanupTaskReports(JobID jobId) throws IOException {
-    return jobSubmitClient.getCleanupTaskReports(jobId);
+    try {
+      return TaskReport.downgradeArray(
+        cluster.getJob(jobId).getTaskReports(TaskType.JOB_CLEANUP));
+    } catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
   }
 
   /**
@@ -1144,9 +647,15 @@
    * @throws IOException
    */    
   public TaskReport[] getSetupTaskReports(JobID jobId) throws IOException {
-    return jobSubmitClient.getSetupTaskReports(jobId);
+    try {
+      return TaskReport.downgradeArray(
+        cluster.getJob(jobId).getTaskReports(TaskType.JOB_SETUP));
+    } catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
   }
 
+  
   /**@deprecated Applications should rather use {@link #getReduceTaskReports(JobID)}*/
   @Deprecated
   public TaskReport[] getReduceTaskReports(String jobId) throws IOException {
@@ -1164,37 +673,13 @@
    */
   public void displayTasks(JobID jobId, String type, String state) 
   throws IOException {
-    TaskReport[] reports = new TaskReport[0];
-    if (type.equals("map")) {
-      reports = getMapTaskReports(jobId);
-    } else if (type.equals("reduce")) {
-      reports = getReduceTaskReports(jobId);
-    } else if (type.equals("setup")) {
-      reports = getSetupTaskReports(jobId);
-    } else if (type.equals("cleanup")) {
-      reports = getCleanupTaskReports(jobId);
-    }
-    for (TaskReport report : reports) {
-      TIPStatus status = report.getCurrentStatus();
-      if ((state.equals("pending") && status ==TIPStatus.PENDING) ||
-          (state.equals("running") && status ==TIPStatus.RUNNING) ||
-          (state.equals("completed") && status == TIPStatus.COMPLETE) ||
-          (state.equals("failed") && status == TIPStatus.FAILED) ||
-          (state.equals("killed") && status == TIPStatus.KILLED)) {
-        printTaskAttempts(report);
-      }
-    }
-  }
-  private void printTaskAttempts(TaskReport report) {
-    if (report.getCurrentStatus() == TIPStatus.COMPLETE) {
-      System.out.println(report.getSuccessfulTaskAttempt());
-    } else if (report.getCurrentStatus() == TIPStatus.RUNNING) {
-      for (TaskAttemptID t : 
-        report.getRunningTaskAttempts()) {
-        System.out.println(t);
-      }
+    try {
+      super.displayTasks(cluster.getJob(jobId), type, state);
+    } catch (InterruptedException ie) {
+      throw new IOException(ie);
     }
   }
+  
   /**
    * Get status information about the Map-Reduce cluster.
    *  
@@ -1203,7 +688,38 @@
    * @throws IOException
    */
   public ClusterStatus getClusterStatus() throws IOException {
-    return getClusterStatus(false);
+    try {
+      ClusterMetrics metrics = cluster.getClusterStatus();
+      return new ClusterStatus(metrics.getTaskTrackerCount(),
+        metrics.getBlackListedTaskTrackerCount(), cluster.getTaskTrackerExpiryInterval(),
+        metrics.getOccupiedMapSlots(),
+        metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
+        metrics.getReduceSlotCapacity(),
+        JobTracker.State.valueOf(cluster.getJobTrackerState().name()),
+        metrics.getDecommissionedTaskTrackerCount());
+    } catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
+  }
+
+  private  Collection<String> arrayToStringList(TaskTrackerInfo[] objs) {
+    Collection<String> list = new ArrayList<String>();
+    for (TaskTrackerInfo info: objs) {
+      list.add(info.getTaskTrackerName());
+    }
+    return list;
+  }
+
+  private  Collection<BlackListInfo> arrayToBlackListInfo(TaskTrackerInfo[] objs) {
+    Collection<BlackListInfo> list = new ArrayList<BlackListInfo>();
+    for (TaskTrackerInfo info: objs) {
+      BlackListInfo binfo = new BlackListInfo();
+      binfo.setTrackerName(info.getTaskTrackerName());
+      binfo.setReasonForBlackListing(info.getReasonForBlacklist());
+      binfo.setBlackListReport(info.getBlacklistReport());
+      list.add(binfo);
+    }
+    return list;
   }
 
   /**
@@ -1216,7 +732,17 @@
    * @throws IOException
    */
   public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
-    return jobSubmitClient.getClusterStatus(detailed);
+    try {
+      ClusterMetrics metrics = cluster.getClusterStatus();
+      return new ClusterStatus(arrayToStringList(cluster.getActiveTaskTrackers()),
+        arrayToBlackListInfo(cluster.getBlackListedTaskTrackers()),
+        cluster.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
+        metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
+        metrics.getReduceSlotCapacity(), 
+        JobTracker.State.valueOf(cluster.getJobTrackerState().name()));
+    } catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
   }
     
 
@@ -1227,17 +753,13 @@
    * @throws IOException
    */
   public JobStatus[] jobsToComplete() throws IOException {
-    return jobSubmitClient.jobsToComplete();
-  }
-
-  private static void downloadProfile(TaskCompletionEvent e
-                                      ) throws IOException  {
-    URLConnection connection = 
-      new URL(getTaskLogURL(e.getTaskAttemptId(), e.getTaskTrackerHttp()) + 
-              "&filter=profile").openConnection();
-    InputStream in = connection.getInputStream();
-    OutputStream out = new FileOutputStream(e.getTaskAttemptId() + ".profile");
-    IOUtils.copyBytes(in, out, 64 * 1024, true);
+    List<JobStatus> stats = new ArrayList<JobStatus>();
+    for (JobStatus stat : getAllJobs()) {
+      if (!stat.isJobComplete()) {
+        stats.add(stat);
+      }
+    }
+    return stats.toArray(new JobStatus[0]);
   }
 
   /** 
@@ -1247,7 +769,16 @@
    * @throws IOException
    */
   public JobStatus[] getAllJobs() throws IOException {
-    return jobSubmitClient.getAllJobs();
+    try {
+      Job jobs[] = cluster.getAllJobs();
+      JobStatus[] stats = new JobStatus[jobs.length];
+      for (int i = 0; i < jobs.length; i++) {
+        stats[i] = JobStatus.downgrade(jobs[i].getStatus());
+      }
+      return stats;
+    } catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
   }
   
   /** 
@@ -1281,129 +812,13 @@
   public boolean monitorAndPrintJob(JobConf conf, 
                                     RunningJob job
   ) throws IOException, InterruptedException {
-    String lastReport = null;
-    TaskStatusFilter filter;
-    filter = getTaskOutputFilter(conf);
-    JobID jobId = job.getID();
-    LOG.info("Running job: " + jobId);
-    int eventCounter = 0;
-    boolean profiling = conf.getProfileEnabled();
-    Configuration.IntegerRanges mapRanges = conf.getProfileTaskRange(true);
-    Configuration.IntegerRanges reduceRanges = conf.getProfileTaskRange(false);
-
-    while (!job.isComplete()) {
-      Thread.sleep(1000);
-      String report = 
-        (" map " + StringUtils.formatPercent(job.mapProgress(), 0)+
-            " reduce " + 
-            StringUtils.formatPercent(job.reduceProgress(), 0));
-      if (!report.equals(lastReport)) {
-        LOG.info(report);
-        lastReport = report;
-      }
-
-      TaskCompletionEvent[] events = 
-        job.getTaskCompletionEvents(eventCounter); 
-      eventCounter += events.length;
-      for(TaskCompletionEvent event : events){
-        TaskCompletionEvent.Status status = event.getTaskStatus();
-        if (profiling && 
-            (status == TaskCompletionEvent.Status.SUCCEEDED ||
-                status == TaskCompletionEvent.Status.FAILED) &&
-                (event.isMap ? mapRanges : reduceRanges).
-                isIncluded(event.idWithinJob())) {
-          downloadProfile(event);
-        }
-        switch(filter){
-        case NONE:
-          break;
-        case SUCCEEDED:
-          if (event.getTaskStatus() == 
-            TaskCompletionEvent.Status.SUCCEEDED){
-            LOG.info(event.toString());
-            displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
-          }
-          break; 
-        case FAILED:
-          if (event.getTaskStatus() == 
-            TaskCompletionEvent.Status.FAILED){
-            LOG.info(event.toString());
-            // Displaying the task diagnostic information
-            TaskAttemptID taskId = event.getTaskAttemptId();
-            String[] taskDiagnostics = 
-              jobSubmitClient.getTaskDiagnostics(taskId); 
-            if (taskDiagnostics != null) {
-              for(String diagnostics : taskDiagnostics){
-                System.err.println(diagnostics);
-              }
-            }
-            // Displaying the task logs
-            displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
-          }
-          break; 
-        case KILLED:
-          if (event.getTaskStatus() == TaskCompletionEvent.Status.KILLED){
-            LOG.info(event.toString());
-          }
-          break; 
-        case ALL:
-          LOG.info(event.toString());
-          displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
-          break;
-        }
-      }
-    }
-    LOG.info("Job complete: " + jobId);
-    Counters counters = job.getCounters();
-    if (counters != null) {
-      counters.log(LOG);
-    }
-    return job.isSuccessful();
+    return ((NetworkedJob)job).monitorAndPrintJob();
   }
 
   static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
     return (baseUrl + "/tasklog?plaintext=true&taskid=" + taskId); 
   }
   
-  private static void displayTaskLogs(TaskAttemptID taskId, String baseUrl)
-    throws IOException {
-    // The tasktracker for a 'failed/killed' job might not be around...
-    if (baseUrl != null) {
-      // Construct the url for the tasklogs
-      String taskLogUrl = getTaskLogURL(taskId, baseUrl);
-      
-      // Copy tasks's stdout of the JobClient
-      getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stdout"), System.out);
-        
-      // Copy task's stderr to stderr of the JobClient 
-      getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stderr"), System.err);
-    }
-  }
-    
-  private static void getTaskLogs(TaskAttemptID taskId, URL taskLogUrl, 
-                                  OutputStream out) {
-    try {
-      URLConnection connection = taskLogUrl.openConnection();
-      BufferedReader input = 
-        new BufferedReader(new InputStreamReader(connection.getInputStream()));
-      BufferedWriter output = 
-        new BufferedWriter(new OutputStreamWriter(out));
-      try {
-        String logData = null;
-        while ((logData = input.readLine()) != null) {
-          if (logData.length() > 0) {
-            output.write(taskId + ": " + logData + "\n");
-            output.flush();
-          }
-        }
-      } finally {
-        input.close();
-      }
-    }catch(IOException ioe){
-      LOG.warn("Error reading task output" + ioe.getMessage()); 
-    }
-  }    
-
   static Configuration getConfiguration(String jobTrackerSpec)
   {
     Configuration conf = new Configuration();
@@ -1463,405 +878,12 @@
     return this.taskOutputFilter; 
   }
 
-  private String getJobPriorityNames() {
-    StringBuffer sb = new StringBuffer();
-    for (JobPriority p : JobPriority.values()) {
-      sb.append(p.name()).append(" ");
-    }
-    return sb.substring(0, sb.length()-1);
-  }
-  
-  /**
-   * Display usage of the command-line tool and terminate execution
-   */
-  private void displayUsage(String cmd) {
-    String prefix = "Usage: JobClient ";
-    String jobPriorityValues = getJobPriorityNames();
-    String taskTypes = "map, reduce, setup, cleanup";
-    String taskStates = "running, completed";
-    if("-submit".equals(cmd)) {
-      System.err.println(prefix + "[" + cmd + " <job-file>]");
-    } else if ("-status".equals(cmd) || "-kill".equals(cmd)) {
-      System.err.println(prefix + "[" + cmd + " <job-id>]");
-    } else if ("-counter".equals(cmd)) {
-      System.err.println(prefix + "[" + cmd + " <job-id> <group-name> <counter-name>]");
-    } else if ("-events".equals(cmd)) {
-      System.err.println(prefix + "[" + cmd + " <job-id> <from-event-#> <#-of-events>]");
-    } else if ("-history".equals(cmd)) {
-      System.err.println(prefix + "[" + cmd + " <jobOutputDir>]");
-    } else if ("-list".equals(cmd)) {
-      System.err.println(prefix + "[" + cmd + " [all]]");
-    } else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) {
-      System.err.println(prefix + "[" + cmd + " <task-id>]");
-    } else if ("-set-priority".equals(cmd)) {
-      System.err.println(prefix + "[" + cmd + " <job-id> <priority>]. " +
-          "Valid values for priorities are: " 
-          + jobPriorityValues); 
-    } else if ("-list-active-trackers".equals(cmd)) {
-      System.err.println(prefix + "[" + cmd + "]");
-    } else if ("-list-blacklisted-trackers".equals(cmd)) {
-      System.err.println(prefix + "[" + cmd + "]");
-    } else if ("-list-attempt-ids".equals(cmd)) {
-      System.err.println(prefix + "[" + cmd + 
-          " <job-id> <task-type> <task-state>]. " +
-          "Valid values for <task-type> are " + taskTypes + ". " +
-          "Valid values for <task-state> are " + taskStates);
-    } else {
-      System.err.printf(prefix + "<command> <args>\n");
-      System.err.printf("\t[-submit <job-file>]\n");
-      System.err.printf("\t[-status <job-id>]\n");
-      System.err.printf("\t[-counter <job-id> <group-name> <counter-name>]\n");
-      System.err.printf("\t[-kill <job-id>]\n");
-      System.err.printf("\t[-set-priority <job-id> <priority>]. " +
-                                      "Valid values for priorities are: " +
-                                      jobPriorityValues + "\n");
-      System.err.printf("\t[-events <job-id> <from-event-#> <#-of-events>]\n");
-      System.err.printf("\t[-history <jobOutputDir>]\n");
-      System.err.printf("\t[-list [all]]\n");
-      System.err.printf("\t[-list-active-trackers]\n");
-      System.err.printf("\t[-list-blacklisted-trackers]\n");
-      System.err.println("\t[-list-attempt-ids <job-id> <task-type> " +
-      		"<task-state>]\n");
-      System.err.printf("\t[-kill-task <task-id>]\n");
-      System.err.printf("\t[-fail-task <task-id>]\n\n");
-      ToolRunner.printGenericCommandUsage(System.out);
-    }
-  }
-    
-  public int run(String[] argv) throws Exception {
-    int exitCode = -1;
-    if (argv.length < 1) {
-      displayUsage("");
-      return exitCode;
-    }    
-    // process arguments
-    String cmd = argv[0];
-    String submitJobFile = null;
-    String jobid = null;
-    String taskid = null;
-    String outputDir = null;
-    String counterGroupName = null;
-    String counterName = null;
-    String newPriority = null;
-    String taskType = null;
-    String taskState = null;
-    int fromEvent = 0;
-    int nEvents = 0;
-    boolean getStatus = false;
-    boolean getCounter = false;
-    boolean killJob = false;
-    boolean listEvents = false;
-    boolean viewHistory = false;
-    boolean viewAllHistory = false;
-    boolean listJobs = false;
-    boolean listAllJobs = false;
-    boolean listActiveTrackers = false;
-    boolean listBlacklistedTrackers = false;
-    boolean displayTasks = false;
-    boolean killTask = false;
-    boolean failTask = false;
-    boolean setJobPriority = false;
-
-    if ("-submit".equals(cmd)) {
-      if (argv.length != 2) {
-        displayUsage(cmd);
-        return exitCode;
-      }
-      submitJobFile = argv[1];
-    } else if ("-status".equals(cmd)) {
-      if (argv.length != 2) {
-        displayUsage(cmd);
-        return exitCode;
-      }
-      jobid = argv[1];
-      getStatus = true;
-    } else if("-counter".equals(cmd)) {
-      if (argv.length != 4) {
-        displayUsage(cmd);
-        return exitCode;
-      }
-      getCounter = true;
-      jobid = argv[1];
-      counterGroupName = argv[2];
-      counterName = argv[3];
-    } else if ("-kill".equals(cmd)) {
-      if (argv.length != 2) {
-        displayUsage(cmd);
-        return exitCode;
-      }
-      jobid = argv[1];
-      killJob = true;
-    } else if ("-set-priority".equals(cmd)) {
-      if (argv.length != 3) {
-        displayUsage(cmd);
-        return exitCode;
-      }
-      jobid = argv[1];
-      newPriority = argv[2];
-      try {
-        JobPriority jp = JobPriority.valueOf(newPriority); 
-      } catch (IllegalArgumentException iae) {
-        displayUsage(cmd);
-        return exitCode;
-      }
-      setJobPriority = true; 
-    } else if ("-events".equals(cmd)) {
-      if (argv.length != 4) {
-        displayUsage(cmd);
-        return exitCode;
-      }
-      jobid = argv[1];
-      fromEvent = Integer.parseInt(argv[2]);
-      nEvents = Integer.parseInt(argv[3]);
-      listEvents = true;
-    } else if ("-history".equals(cmd)) {
-      if (argv.length != 2 && !(argv.length == 3 && "all".equals(argv[1]))) {
-         displayUsage(cmd);
-         return exitCode;
-      }
-      viewHistory = true;
-      if (argv.length == 3 && "all".equals(argv[1])) {
-         viewAllHistory = true;
-         outputDir = argv[2];
-      } else {
-         outputDir = argv[1];
-      }
-    } else if ("-list".equals(cmd)) {
-      if (argv.length != 1 && !(argv.length == 2 && "all".equals(argv[1]))) {
-        displayUsage(cmd);
-        return exitCode;
-      }
-      if (argv.length == 2 && "all".equals(argv[1])) {
-        listAllJobs = true;
-      } else {
-        listJobs = true;
-      }
-    } else if("-kill-task".equals(cmd)) {
-      if(argv.length != 2) {
-        displayUsage(cmd);
-        return exitCode;
-      }
-      killTask = true;
-      taskid = argv[1];
-    } else if("-fail-task".equals(cmd)) {
-      if(argv.length != 2) {
-        displayUsage(cmd);
-        return exitCode;
-      }
-      failTask = true;
-      taskid = argv[1];
-    } else if ("-list-active-trackers".equals(cmd)) {
-      if (argv.length != 1) {
-        displayUsage(cmd);
-        return exitCode;
-      }
-      listActiveTrackers = true;
-    } else if ("-list-blacklisted-trackers".equals(cmd)) {
-      if (argv.length != 1) {
-        displayUsage(cmd);
-        return exitCode;
-      }
-      listBlacklistedTrackers = true;
-    } else if ("-list-attempt-ids".equals(cmd)) {
-      if (argv.length != 4) {
-        displayUsage(cmd);
-        return exitCode;
-      }
-      jobid = argv[1];
-      taskType = argv[2];
-      taskState = argv[3];
-      displayTasks = true;
-    } else {
-      displayUsage(cmd);
-      return exitCode;
-    }
-
-    // initialize JobClient
-    JobConf conf = null;
-    if (submitJobFile != null) {
-      conf = new JobConf(submitJobFile);
-    } else {
-      conf = new JobConf(getConf());
-    }
-    init(conf);
-        
-    // Submit the request
-    try {
-      if (submitJobFile != null) {
-        RunningJob job = submitJob(conf);
-        System.out.println("Created job " + job.getID());
-        exitCode = 0;
-      } else if (getStatus) {
-        RunningJob job = getJob(JobID.forName(jobid));
-        if (job == null) {
-          System.out.println("Could not find job " + jobid);
-        } else {
-          System.out.println();
-          System.out.println(job);
-          Counters counters = job.getCounters();
-          if (counters != null) {
-            System.out.println(counters);
-          } else {
-            System.out.println("Counters not available. Job is retired.");
-          }
-          exitCode = 0;
-        }
-      } else if (getCounter) {
-        RunningJob job = getJob(JobID.forName(jobid));
-        if (job == null) {
-          System.out.println("Could not find job " + jobid);
-        } else {
-          Counters counters = job.getCounters();
-          if (counters == null) {
-            System.out.println("Counters not available for retired job " + 
-                jobid);
-            exitCode = -1;
-          } else {
-            Group group = counters.getGroup(counterGroupName);
-            Counter counter = group.getCounterForName(counterName);
-            System.out.println(counter.getCounter());
-            exitCode = 0;
-          }
-        }
-      } else if (killJob) {
-        RunningJob job = getJob(JobID.forName(jobid));
-        if (job == null) {
-          System.out.println("Could not find job " + jobid);
-        } else {
-          job.killJob();
-          System.out.println("Killed job " + jobid);
-          exitCode = 0;
-        }
-      } else if (setJobPriority) {
-        RunningJob job = getJob(JobID.forName(jobid));
-        if (job == null) {
-          System.out.println("Could not find job " + jobid);
-        } else {
-          job.setJobPriority(newPriority);
-          System.out.println("Changed job priority.");
-          exitCode = 0;
-        } 
-      } else if (viewHistory) {
-        viewHistory(outputDir, viewAllHistory);
-        exitCode = 0;
-      } else if (listEvents) {
-        listEvents(JobID.forName(jobid), fromEvent, nEvents);
-        exitCode = 0;
-      } else if (listJobs) {
-        listJobs();
-        exitCode = 0;
-      } else if (listAllJobs) {
-        listAllJobs();
-        exitCode = 0;
-      } else if (listActiveTrackers) {
-        listActiveTrackers();
-        exitCode = 0;
-      } else if (listBlacklistedTrackers) {
-        listBlacklistedTrackers();
-        exitCode = 0;
-      } else if (displayTasks) {
-        displayTasks(JobID.forName(jobid), taskType, taskState);
-      } else if(killTask) {
-        if(jobSubmitClient.killTask(TaskAttemptID.forName(taskid), false)) {
-          System.out.println("Killed task " + taskid);
-          exitCode = 0;
-        } else {
-          System.out.println("Could not kill task " + taskid);
-          exitCode = -1;
-        }
-      } else if(failTask) {
-        if(jobSubmitClient.killTask(TaskAttemptID.forName(taskid), true)) {
-          System.out.println("Killed task " + taskid + " by failing it");
-          exitCode = 0;
-        } else {
-          System.out.println("Could not fail task " + taskid);
-          exitCode = -1;
-        }
-      }
-    } finally {
-      close();
-    }
-    return exitCode;
-  }
-
-  private void viewHistory(String outputDir, boolean all) 
-    throws IOException {
-    HistoryViewer historyViewer = new HistoryViewer(outputDir,
-                                        getConf(), all);
-    historyViewer.print();
+  protected long getCounter(org.apache.hadoop.mapreduce.Counters cntrs,
+      String counterGroupName, String counterName) throws IOException {
+    Counters counters = Counters.downgrade(cntrs);
+    return counters.findCounter(counterGroupName, counterName).getValue();
   }
   
-  /**
-   * List the events for the given job
-   * @param jobId the job id for the job's events to list
-   * @throws IOException
-   */
-  private void listEvents(JobID jobId, int fromEventId, int numEvents)
-    throws IOException {
-    TaskCompletionEvent[] events = 
-      jobSubmitClient.getTaskCompletionEvents(jobId, fromEventId, numEvents);
-    System.out.println("Task completion events for " + jobId);
-    System.out.println("Number of events (from " + fromEventId + 
-                       ") are: " + events.length);
-    for(TaskCompletionEvent event: events) {
-      System.out.println(event.getTaskStatus() + " " + event.getTaskAttemptId() + " " + 
-                         getTaskLogURL(event.getTaskAttemptId(), 
-                                       event.getTaskTrackerHttp()));
-    }
-  }
-
-  /**
-   * Dump a list of currently running jobs
-   * @throws IOException
-   */
-  private void listJobs() throws IOException {
-    JobStatus[] jobs = jobsToComplete();
-    if (jobs == null)
-      jobs = new JobStatus[0];
-
-    System.out.printf("%d jobs currently running\n", jobs.length);
-    displayJobList(jobs);
-  }
-    
-  /**
-   * Dump a list of all jobs submitted.
-   * @throws IOException
-   */
-  private void listAllJobs() throws IOException {
-    JobStatus[] jobs = getAllJobs();
-    if (jobs == null)
-      jobs = new JobStatus[0];
-    System.out.printf("%d jobs submitted\n", jobs.length);
-    System.out.printf("States are:\n\tRunning : 1\tSucceded : 2" +
-    "\tFailed : 3\tPrep : 4\n");
-    displayJobList(jobs);
-  }
-  
-  /**
-   * Display the list of active trackers
-   */
-  private void listActiveTrackers() throws IOException {
-    ClusterStatus c = jobSubmitClient.getClusterStatus(true);
-    Collection<String> trackers = c.getActiveTrackerNames();
-    for (String trackerName : trackers) {
-      System.out.println(trackerName);
-    }
-  }
-
-  /**
-   * Display the list of blacklisted trackers
-   */
-  private void listBlacklistedTrackers() throws IOException {
-    ClusterStatus c = jobSubmitClient.getClusterStatus(true);
-    Collection<BlackListInfo> trackers = c.getBlackListedTrackersInfo();
-    if(trackers.size() > 0) {
-      System.out.println("BlackListedNode \t Reason \t Report");
-    }
-    for (BlackListInfo tracker : trackers) {
-      System.out.println(tracker.toString());
-    }
-  }
-
   void displayJobList(JobStatus[] jobs) {
     System.out.printf("JobId\tState\tStartTime\tUserName\tPriority\tSchedulingInfo\n");
     for (JobStatus job : jobs) {
@@ -1878,7 +900,11 @@
    * @throws IOException
    */
   public int getDefaultMaps() throws IOException {
-    return getClusterStatus().getMaxMapTasks();
+    try {
+      return cluster.getClusterStatus().getMapSlotCapacity();
+    } catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
   }
 
   /**
@@ -1888,7 +914,11 @@
    * @throws IOException
    */
   public int getDefaultReduces() throws IOException {
-    return getClusterStatus().getMaxReduceTasks();
+    try {
+      return cluster.getClusterStatus().getReduceSlotCapacity();
+    } catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
   }
 
   /**
@@ -1897,12 +927,54 @@
    * @return the system directory where job-specific files are to be placed.
    */
   public Path getSystemDir() {
-    if (sysDir == null) {
-      sysDir = new Path(jobSubmitClient.getSystemDir());
+    try {
+      return cluster.getSystemDir();
+    } catch (IOException ioe) {
+      return null;
+    } catch (InterruptedException ie) {
+      return null;
+    }
+  }
+
+  private JobQueueInfo[] getJobQueueInfoArray(QueueInfo[] queues) 
+  throws IOException {
+    JobQueueInfo[] ret = new JobQueueInfo[queues.length];
+    for (int i = 0; i < queues.length; i++) {
+      ret[i] = new JobQueueInfo(queues[i]);
+    }
+    return ret;
+  }
+
+  /**
+   * Returns an array of queue information objects about root level queues
+   * configured
+   *
+   * @return the array of root level JobQueueInfo objects
+   * @throws IOException
+   */
+  public JobQueueInfo[] getRootQueues() throws IOException {
+    try {
+      return getJobQueueInfoArray(cluster.getRootQueues());
+    } catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
+  }
+
+  /**
+   * Returns an array of queue information objects about immediate children
+   * of queue queueName.
+   * 
+   * @param queueName
+   * @return the array of immediate children JobQueueInfo objects
+   * @throws IOException
+   */
+  public JobQueueInfo[] getChildQueues(String queueName) throws IOException {
+    try {
+      return getJobQueueInfoArray(cluster.getChildQueues(queueName));
+    } catch (InterruptedException ie) {
+      throw new IOException(ie);
     }
-    return sysDir;
   }
-  
   
   /**
    * Return an array of queue information objects about all the Job Queues
@@ -1912,7 +984,11 @@
    * @throws IOException
    */
   public JobQueueInfo[] getQueues() throws IOException {
-    return jobSubmitClient.getQueues();
+    try {
+      return getJobQueueInfoArray(cluster.getQueues());
+    } catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
   }
   
   /**
@@ -1924,7 +1000,17 @@
    */
   
   public JobStatus[] getJobsFromQueue(String queueName) throws IOException {
-    return jobSubmitClient.getJobsFromQueue(queueName);
+    try {
+      org.apache.hadoop.mapreduce.JobStatus[] stats = 
+        cluster.getQueue(queueName).getJobStatuses();
+      JobStatus[] ret = new JobStatus[stats.length];
+      for (int i = 0 ; i < stats.length; i++ ) {
+        ret[i] = JobStatus.downgrade(stats[i]);
+      }
+      return ret;
+    } catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
   }
   
   /**
@@ -1935,7 +1021,11 @@
    * @throws IOException
    */
   public JobQueueInfo getQueueInfo(String queueName) throws IOException {
-    return jobSubmitClient.getQueueInfo(queueName);
+    try {
+      return new JobQueueInfo(cluster.getQueue(queueName));
+    } catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
   }
   
   /**
@@ -1944,7 +1034,17 @@
    * @throws IOException
    */
   public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException {
-    return jobSubmitClient.getQueueAclsForCurrentUser();
+    try {
+      org.apache.hadoop.mapreduce.QueueAclsInfo[] acls = 
+        cluster.getQueueAclsForCurrentUser();
+      QueueAclsInfo[] ret = new QueueAclsInfo[acls.length];
+      for (int i = 0 ; i < acls.length; i++ ) {
+        ret[i] = QueueAclsInfo.downgrade(acls[i]);
+      }
+      return ret;
+    } catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
   }
 
   /**



Mime
View raw message