hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r816496 [4/4] - in /hadoop/mapreduce/trunk: ./ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/ src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ src...
Date Fri, 18 Sep 2009 07:04:45 GMT
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/QueueInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/QueueInfo.java?rev=816496&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/QueueInfo.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/QueueInfo.java Fri Sep 18 07:04:42 2009
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Class that contains the information regarding the Job Queues which are 
+ * maintained by the Hadoop Map/Reduce framework.
+ * 
+ */
+public class QueueInfo implements Writable {
+
+  private String queueName = "";
+  
+  //The scheduling Information object is read back as String.
+  //Once the scheduling information is set there is no way to recover it.
+  private String schedulingInfo; 
+  
+  private QueueState queueState;
+  
+  // Jobs submitted to the queue
+  private JobStatus[] stats;
+  
+  private List<QueueInfo> children;
+
+  private Properties props;
+
+  /**
+   * Default constructor for QueueInfo.
+   * 
+   */
+  public QueueInfo() {
+    children = new ArrayList<QueueInfo>();
+  }
+  
+  /**
+   * Construct a new QueueInfo object using the queue name and the
+   * scheduling information passed.
+   * 
+   * @param queueName Name of the job queue
+   * @param schedulingInfo Scheduling Information associated with the job
+   * queue
+   */
+  public QueueInfo(String queueName, String schedulingInfo) {
+    this.queueName = queueName;
+    this.schedulingInfo = schedulingInfo;
+    // make it running by default.
+    this.queueState = QueueState.RUNNING;
+    children = new ArrayList<QueueInfo>();
+  }
+  
+  /**
+   * 
+   * @param queueName
+   * @param schedulingInfo
+   * @param state
+   * @param stats
+   */
+  public QueueInfo(String queueName, String schedulingInfo, QueueState state,
+                   JobStatus[] stats) {
+    this.queueName = queueName;
+    this.schedulingInfo = schedulingInfo;
+    // make it running by default.
+    this.queueState = state;
+    this.stats = stats;
+  }
+
+  /**
+   * Set the queue name of the JobQueueInfo
+   * 
+   * @param queueName Name of the job queue.
+   */
+  protected void setQueueName(String queueName) {
+    this.queueName = queueName;
+  }
+
+  /**
+   * Get the queue name from JobQueueInfo
+   * 
+   * @return queue name
+   */
+  public String getQueueName() {
+    return queueName;
+  }
+
+  /**
+   * Set the scheduling information associated to particular job queue
+   * 
+   * @param schedulingInfo
+   */
+  protected void setSchedulingInfo(String schedulingInfo) {
+    this.schedulingInfo = schedulingInfo;
+  }
+
+  /**
+   * Gets the scheduling information associated to particular job queue.
+   * If nothing is set would return <b>"N/A"</b>
+   * 
+   * @return Scheduling information associated to particular Job Queue
+   */
+  public String getSchedulingInfo() {
+    if(schedulingInfo != null) {
+      return schedulingInfo;
+    }else {
+      return "N/A";
+    }
+  }
+  
+  /**
+   * Set the state of the queue
+   * @param state state of the queue.
+   */
+  protected void setState(QueueState state) {
+    queueState = state;
+  }
+  
+  /**
+   * Return the queue state
+   * @return the queue state.
+   */
+  public QueueState getState() {
+    return queueState;
+  }
+  
+  protected void setJobStatuses(JobStatus[] stats) {
+    this.stats = stats;
+  }
+
+  /** 
+   * Get immediate children.
+   * 
+   * @return list of QueueInfo
+   */
+  public List<QueueInfo> getQueueChildren() {
+    return children;
+  }
+
+  protected void setQueueChildren(List<QueueInfo> children) {
+    this.children =  children; 
+  }
+
+  /**
+   * Get properties.
+   * 
+   * @return Properties
+   */
+  public Properties getProperties() {
+    return props;
+  }
+
+  protected void setProperties(Properties props) {
+    this.props = props;
+  }
+
+  /**
+   * Get the jobs submitted to queue
+   * @return list of JobStatus for the submitted jobs
+   */
+  public JobStatus[] getJobStatuses() {
+    return stats;
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    queueName = Text.readString(in);
+    queueState = WritableUtils.readEnum(in, QueueState.class);
+    schedulingInfo = Text.readString(in);
+    int length = in.readInt();
+    stats = new JobStatus[length];
+    for (int i = 0; i < length; i++) {
+      stats[i] = new JobStatus();
+      stats[i].readFields(in);
+    }
+    int count = in.readInt();
+    children.clear();
+    for (int i = 0; i < count; i++) {
+      QueueInfo childQueueInfo = new QueueInfo();
+      childQueueInfo.readFields(in);
+      children.add(childQueueInfo);
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, queueName);
+    WritableUtils.writeEnum(out, queueState);
+    
+    if(schedulingInfo!= null) {
+      Text.writeString(out, schedulingInfo);
+    }else {
+      Text.writeString(out, "N/A");
+    }
+    out.writeInt(stats.length);
+    for (JobStatus stat : stats) {
+      stat.write(out);
+    }
+    out.writeInt(children.size());
+    for(QueueInfo childQueueInfo : children) {
+      childQueueInfo.write(out);
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/QueueState.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/QueueState.java?rev=816496&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/QueueState.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/QueueState.java Fri Sep 18 07:04:42 2009
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+/**
+ * Used represent queue state
+ */
+public enum QueueState {
+  STOPPED("stopped"), RUNNING("running");
+  
+  private String str;
+  
+  QueueState(String str) {
+    this.str = str;
+  }
+  
+  public String toString() {
+    return str;
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCompletionEvent.java?rev=816496&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCompletionEvent.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCompletionEvent.java Fri Sep 18 07:04:42 2009
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * This is used to track task completion events on 
+ * job tracker. 
+ */
+public class TaskCompletionEvent implements Writable{
+  static public enum Status {FAILED, KILLED, SUCCEEDED, OBSOLETE, TIPFAILED};
+    
+  private int eventId; 
+  private String taskTrackerHttp;
+  private int taskRunTime; // using int since runtime is the time difference
+  private TaskAttemptID taskId;
+  Status status; 
+  boolean isMap = false;
+  private int idWithinJob;
+  public static final TaskCompletionEvent[] EMPTY_ARRAY = 
+    new TaskCompletionEvent[0];
+  /**
+   * Default constructor for Writable.
+   *
+   */
+  public TaskCompletionEvent(){
+    taskId = new TaskAttemptID();
+  }
+
+  /**
+   * Constructor. eventId should be created externally and incremented
+   * per event for each job. 
+   * @param eventId event id, event id should be unique and assigned in
+   *  incrementally, starting from 0. 
+   * @param taskId task id
+   * @param status task's status 
+   * @param taskTrackerHttp task tracker's host:port for http. 
+   */
+  public TaskCompletionEvent(int eventId, 
+                             TaskAttemptID taskId,
+                             int idWithinJob,
+                             boolean isMap,
+                             Status status, 
+                             String taskTrackerHttp){
+      
+    this.taskId = taskId;
+    this.idWithinJob = idWithinJob;
+    this.isMap = isMap;
+    this.eventId = eventId; 
+    this.status =status; 
+    this.taskTrackerHttp = taskTrackerHttp;
+  }
+  /**
+   * Returns event Id. 
+   * @return event id
+   */
+  public int getEventId() {
+    return eventId;
+  }
+  
+  /**
+   * Returns task id. 
+   * @return task id
+   */
+  public TaskAttemptID getTaskAttemptId() {
+    return taskId;
+  }
+  
+  /**
+   * Returns enum Status.SUCESS or Status.FAILURE.
+   * @return task tracker status
+   */
+  public Status getStatus() {
+    return status;
+  }
+  /**
+   * http location of the tasktracker where this task ran. 
+   * @return http location of tasktracker user logs
+   */
+  public String getTaskTrackerHttp() {
+    return taskTrackerHttp;
+  }
+
+  /**
+   * Returns time (in millisec) the task took to complete. 
+   */
+  public int getTaskRunTime() {
+    return taskRunTime;
+  }
+
+  /**
+   * Set the task completion time
+   * @param taskCompletionTime time (in millisec) the task took to complete
+   */
+  protected void setTaskRunTime(int taskCompletionTime) {
+    this.taskRunTime = taskCompletionTime;
+  }
+
+  /**
+   * set event Id. should be assigned incrementally starting from 0. 
+   * @param eventId
+   */
+  protected void setEventId(int eventId) {
+    this.eventId = eventId;
+  }
+
+  /**
+   * Sets task id. 
+   * @param taskId
+   */
+  protected void setTaskAttemptId(TaskAttemptID taskId) {
+    this.taskId = taskId;
+  }
+  
+  /**
+   * Set task status. 
+   * @param status
+   */
+  protected void setTaskStatus(Status status) {
+    this.status = status;
+  }
+  
+  /**
+   * Set task tracker http location. 
+   * @param taskTrackerHttp
+   */
+  protected void setTaskTrackerHttp(String taskTrackerHttp) {
+    this.taskTrackerHttp = taskTrackerHttp;
+  }
+    
+  @Override
+  public String toString(){
+    StringBuffer buf = new StringBuffer(); 
+    buf.append("Task Id : "); 
+    buf.append(taskId); 
+    buf.append(", Status : ");  
+    buf.append(status.name());
+    return buf.toString();
+  }
+    
+  @Override
+  public boolean equals(Object o) {
+    if(o == null)
+      return false;
+    if(o.getClass().equals(this.getClass())) {
+      TaskCompletionEvent event = (TaskCompletionEvent) o;
+      return this.isMap == event.isMapTask() 
+             && this.eventId == event.getEventId()
+             && this.idWithinJob == event.idWithinJob() 
+             && this.status.equals(event.getStatus())
+             && this.taskId.equals(event.getTaskAttemptId()) 
+             && this.taskRunTime == event.getTaskRunTime()
+             && this.taskTrackerHttp.equals(event.getTaskTrackerHttp());
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return toString().hashCode(); 
+  }
+
+  public boolean isMapTask() {
+    return isMap;
+  }
+    
+  public int idWithinJob() {
+    return idWithinJob;
+  }
+  //////////////////////////////////////////////
+  // Writable
+  //////////////////////////////////////////////
+  public void write(DataOutput out) throws IOException {
+    taskId.write(out); 
+    WritableUtils.writeVInt(out, idWithinJob);
+    out.writeBoolean(isMap);
+    WritableUtils.writeEnum(out, status); 
+    WritableUtils.writeString(out, taskTrackerHttp);
+    WritableUtils.writeVInt(out, taskRunTime);
+    WritableUtils.writeVInt(out, eventId);
+  }
+  
+  public void readFields(DataInput in) throws IOException {
+    taskId.readFields(in); 
+    idWithinJob = WritableUtils.readVInt(in);
+    isMap = in.readBoolean();
+    status = WritableUtils.readEnum(in, Status.class);
+    taskTrackerHttp = WritableUtils.readString(in);
+    taskRunTime = WritableUtils.readVInt(in);
+    eventId = WritableUtils.readVInt(in);
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskReport.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskReport.java?rev=816496&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskReport.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskReport.java Fri Sep 18 07:04:42 2009
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.TIPStatus;
+
+/** A report on the state of a task. */
+public class TaskReport implements Writable {
+  private TaskID taskid;
+  private float progress;
+  private String state;
+  private String[] diagnostics;
+  private long startTime; 
+  private long finishTime; 
+  private Counters counters;
+  private TIPStatus currentStatus;
+  
+  private Collection<TaskAttemptID> runningAttempts = 
+    new ArrayList<TaskAttemptID>();
+  private TaskAttemptID successfulAttempt = new TaskAttemptID();
+  public TaskReport() {
+    taskid = new TaskID();
+  }
+  
+  /**
+   * Creates a new TaskReport object
+   * @param taskid
+   * @param progress
+   * @param state
+   * @param diagnostics
+   * @param currentStatus
+   * @param startTime
+   * @param finishTime
+   * @param counters
+   */
+  public TaskReport(TaskID taskid, float progress, String state,
+             String[] diagnostics, TIPStatus currentStatus, 
+             long startTime, long finishTime,
+             Counters counters) {
+    this.taskid = taskid;
+    this.progress = progress;
+    this.state = state;
+    this.diagnostics = diagnostics;
+    this.currentStatus = currentStatus;
+    this.startTime = startTime; 
+    this.finishTime = finishTime;
+    this.counters = counters;
+  }
+    
+  /** The id of the task. */
+  public TaskID getTaskId() { return taskid; }
+
+  /** The amount completed, between zero and one. */
+  public float getProgress() { return progress; }
+  
+  /** The most recent state, reported by the Reporter. */
+  public String getState() { return state; }
+  
+  /** A list of error messages. */
+  public String[] getDiagnostics() { return diagnostics; }
+  
+  /** A table of counters. */
+  public Counters getTaskCounters() { return counters; }
+  
+  /** The current status */
+  public TIPStatus getCurrentStatus() {
+    return currentStatus;
+  }
+  
+  /**
+   * Get finish time of task. 
+   * @return 0, if finish time was not set else returns finish time.
+   */
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  /** 
+   * set finish time of task. 
+   * @param finishTime finish time of task. 
+   */
+  protected void setFinishTime(long finishTime) {
+    this.finishTime = finishTime;
+  }
+
+  /**
+   * Get start time of task. 
+   * @return 0 if start time was not set, else start time. 
+   */
+  public long getStartTime() {
+    return startTime;
+  }
+
+  /** 
+   * set start time of the task. 
+   */ 
+  protected void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+
+  /** 
+   * set successful attempt ID of the task. 
+   */ 
+  protected void setSuccessfulAttemptId(TaskAttemptID t) {
+    successfulAttempt = t;
+  }
+  
+  /**
+   * Get the attempt ID that took this task to completion
+   */
+  public TaskAttemptID getSuccessfulTaskAttemptId() {
+    return successfulAttempt;
+  }
+  
+  /** 
+   * set running attempt(s) of the task. 
+   */ 
+  protected void setRunningTaskAttemptIds(
+      Collection<TaskAttemptID> runningAttempts) {
+    this.runningAttempts = runningAttempts;
+  }
+  
+  /**
+   * Get the running task attempt IDs for this task
+   */
+  public Collection<TaskAttemptID> getRunningTaskAttemptIds() {
+    return runningAttempts;
+  }
+
+
+  @Override
+  public boolean equals(Object o) {
+    if(o == null)
+      return false;
+    if(o.getClass().equals(this.getClass())) {
+      TaskReport report = (TaskReport) o;
+      return counters.equals(report.getTaskCounters())
+             && Arrays.toString(this.diagnostics)
+                      .equals(Arrays.toString(report.getDiagnostics()))
+             && this.finishTime == report.getFinishTime()
+             && this.progress == report.getProgress()
+             && this.startTime == report.getStartTime()
+             && this.state.equals(report.getState())
+             && this.taskid.equals(report.getTaskId());
+    }
+    return false; 
+  }
+
+  @Override
+  public int hashCode() {
+    return (counters.toString() + Arrays.toString(this.diagnostics) 
+            + this.finishTime + this.progress + this.startTime + this.state 
+            + this.taskid.toString()).hashCode();
+  }
+  //////////////////////////////////////////////
+  // Writable
+  //////////////////////////////////////////////
+  public void write(DataOutput out) throws IOException {
+    taskid.write(out);
+    out.writeFloat(progress);
+    Text.writeString(out, state);
+    out.writeLong(startTime);
+    out.writeLong(finishTime);
+    WritableUtils.writeStringArray(out, diagnostics);
+    counters.write(out);
+    WritableUtils.writeEnum(out, currentStatus);
+    if (currentStatus == TIPStatus.RUNNING) {
+      WritableUtils.writeVInt(out, runningAttempts.size());
+      TaskAttemptID t[] = new TaskAttemptID[0];
+      t = runningAttempts.toArray(t);
+      for (int i = 0; i < t.length; i++) {
+        t[i].write(out);
+      }
+    } else if (currentStatus == TIPStatus.COMPLETE) {
+      successfulAttempt.write(out);
+    }
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    this.taskid.readFields(in);
+    this.progress = in.readFloat();
+    this.state = Text.readString(in);
+    this.startTime = in.readLong(); 
+    this.finishTime = in.readLong();
+    
+    diagnostics = WritableUtils.readStringArray(in);
+    counters = new Counters();
+    counters.readFields(in);
+    currentStatus = WritableUtils.readEnum(in, TIPStatus.class);
+    if (currentStatus == TIPStatus.RUNNING) {
+      int num = WritableUtils.readVInt(in);    
+      for (int i = 0; i < num; i++) {
+        TaskAttemptID t = new TaskAttemptID();
+        t.readFields(in);
+        runningAttempts.add(t);
+      }
+    } else if (currentStatus == TIPStatus.COMPLETE) {
+      successfulAttempt.readFields(in);
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskTrackerInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskTrackerInfo.java?rev=816496&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskTrackerInfo.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskTrackerInfo.java Fri Sep 18 07:04:42 2009
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Information about TaskTracker.
+ */
+public class TaskTrackerInfo implements Writable {
+  String name;
+  boolean isBlacklisted = false;
+  String reasonForBlacklist = "";
+  String blacklistReport = "";
+  
+  public TaskTrackerInfo() {
+  }
+  // construct an active tracker
+  public TaskTrackerInfo(String name) {
+    this.name = name;
+  }
+
+  // construct blacklisted tracker
+  public TaskTrackerInfo(String name, String reasonForBlacklist,
+      String report) {
+    this.name = name;
+    this.isBlacklisted = true;
+    this.reasonForBlacklist = reasonForBlacklist;
+    this.blacklistReport = report;
+  }
+
+  /**
+   * Gets the tasktracker's name.
+   * 
+   * @return tracker's name.
+   */
+  public String getTaskTrackerName() {
+    return name;
+  }
+  
+  /**
+   * Whether tracker is blacklisted
+   * @return true if tracker is blacklisted
+   *         false otherwise
+   */
+  public boolean isBlacklisted() {
+    return isBlacklisted;
+  }
+  
+  /**
+   * Gets the reason for which the tasktracker was blacklisted.
+   * 
+   * @return reason which tracker was blacklisted
+   */
+  public String getReasonForBlacklist() {
+    return reasonForBlacklist;
+  }
+
+  /**
+   * Gets a descriptive report about why the tasktracker was blacklisted.
+   * 
+   * @return report describing why the tasktracker was blacklisted.
+   */
+  public String getBlacklistReport() {
+    return blacklistReport;
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    name = Text.readString(in);
+    isBlacklisted = in.readBoolean();
+    reasonForBlacklist = Text.readString(in);
+    blacklistReport = Text.readString(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, name);
+    out.writeBoolean(isBlacklisted);
+    Text.writeString(out, reasonForBlacklist);
+    Text.writeString(out, blacklistReport);
+  }
+
+}

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobcontrol/ControlledJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobcontrol/ControlledJob.java?rev=816496&r1=816495&r2=816496&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobcontrol/ControlledJob.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobcontrol/ControlledJob.java Fri Sep 18 07:04:42 2009
@@ -228,7 +228,7 @@
     return this.state == State.READY;
   }
 
-  public void killJob() throws IOException {
+  public void killJob() throws IOException, InterruptedException {
     job.killJob();
   }
   
@@ -236,7 +236,7 @@
    * Check the state of this running job. The state may 
    * remain the same, become SUCCESS or FAILED.
    */
-  private void checkRunningState() {
+  private void checkRunningState() throws IOException, InterruptedException {
     try {
       if (job.isComplete()) {
         if (job.isSuccessful()) {
@@ -261,7 +261,7 @@
    * Check and update the state of this job. The state changes  
    * depending on its current state and the states of the depending jobs.
    */
-   synchronized State checkState() {
+   synchronized State checkState() throws IOException, InterruptedException {
     if (this.state == State.RUNNING) {
       checkRunningState();
     }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java?rev=816496&r1=816495&r2=816496&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java Fri Sep 18 07:04:42 2009
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapreduce.lib.jobcontrol;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Hashtable;
@@ -206,7 +207,8 @@
     }
   }
 	
-  synchronized private void checkRunningJobs() {
+  synchronized private void checkRunningJobs() 
+      throws IOException, InterruptedException {
 		
     Map<String, ControlledJob> oldJobs = null;
     oldJobs = this.runningJobs;
@@ -218,7 +220,8 @@
     }
   }
 	
-  synchronized private void checkWaitingJobs() {
+  synchronized private void checkWaitingJobs() 
+      throws IOException, InterruptedException {
     Map<String, ControlledJob> oldJobs = null;
     oldJobs = this.waitingJobs;
     this.waitingJobs = new Hashtable<String, ControlledJob>();
@@ -265,9 +268,13 @@
 					
         }
       }
-      checkRunningJobs();	
-      checkWaitingJobs();		
-      startReadyJobs();		
+      try {
+        checkRunningJobs();	
+        checkWaitingJobs();
+        startReadyJobs();
+      } catch (Exception e) {
+  	    this.runnerState = ThreadState.STOPPED;
+      }
       if (this.runnerState != ThreadState.RUNNING && 
           this.runnerState != ThreadState.SUSPENDED) {
         break;

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java?rev=816496&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java Fri Sep 18 07:04:42 2009
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.QueueAclsInfo;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.TaskReport;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.State;
+
+/** 
+ * Protocol that a JobClient and the central JobTracker use to communicate.  The
+ * JobClient can use these methods to submit a Job for execution, and learn about
+ * the current system status.
+ */ 
+public interface ClientProtocol extends VersionedProtocol {
+  /* 
+   *Changing the versionID to 2L since the getTaskCompletionEvents method has
+   *changed.
+   *Changed to 4 since killTask(String,boolean) is added
+   *Version 4: added jobtracker state to ClusterStatus
+   *Version 5: max_tasks in ClusterStatus is replaced by
+   * max_map_tasks and max_reduce_tasks for HADOOP-1274
+   * Version 6: change the counters representation for HADOOP-2248
+   * Version 7: added getAllJobs for HADOOP-2487
+   * Version 8: change {job|task}id's to use corresponding objects rather that strings.
+   * Version 9: change the counter representation for HADOOP-1915
+   * Version 10: added getSystemDir for HADOOP-3135
+   * Version 11: changed JobProfile to include the queue name for HADOOP-3698
+   * Version 12: Added getCleanupTaskReports and 
+   *             cleanupProgress to JobStatus as part of HADOOP-3150
+   * Version 13: Added getJobQueueInfos and getJobQueueInfo(queue name)
+   *             and getAllJobs(queue) as a part of HADOOP-3930
+   * Version 14: Added setPriority for HADOOP-4124
+   * Version 15: Added KILLED status to JobStatus as part of HADOOP-3924            
+   * Version 16: Added getSetupTaskReports and 
+   *             setupProgress to JobStatus as part of HADOOP-4261           
+   * Version 17: getClusterStatus returns the amount of memory used by 
+   *             the server. HADOOP-4435
+   * Version 18: Added blacklisted trackers to the ClusterStatus 
+   *             for HADOOP-4305
+   * Version 19: Modified TaskReport to have TIP status and modified the
+   *             method getClusterStatus() to take a boolean argument
+   *             for HADOOP-4807
+   * Version 20: Modified ClusterStatus to have the tasktracker expiry
+   *             interval for HADOOP-4939
+   * Version 21: Modified TaskID to be aware of the new TaskTypes                                 
+   * Version 22: Added method getQueueAclsForCurrentUser to get queue acls info
+   *             for a user
+   * Version 23: Modified the JobQueueInfo class to inlucde queue state.
+   *             Part of HADOOP-5913.  
+   * Version 24: Modified ClusterStatus to include BlackListInfo class which 
+   *             encapsulates reasons and report for blacklisted node.          
+   * Version 25: Added fields to JobStatus for HADOOP-817.   
+   * Version 26: Added properties to JobQueueInfo as part of MAPREDUCE-861.
+   *              added new api's getRootQueues and
+   *              getChildQueues(String queueName)
+   * Version 27: Changed protocol to use new api objects. And the protocol is 
+   *             renamed from JobSubmissionProtocol to ClientProtocol.          
+   */
+  public static final long versionID = 27L;
+
+  /**
+   * Allocate a name for the job.
+   * @return a unique job name for submitting jobs.
+   * @throws IOException
+   */
+  public JobID getNewJobID() throws IOException, InterruptedException;
+
+  /**
+   * Submit a Job for execution.  Returns the latest profile for
+   * that job.
+   * The job files should be submitted in <b>system-dir</b>/<b>jobName</b>.
+   */
+  public JobStatus submitJob(JobID jobName) 
+  throws IOException, InterruptedException;
+
+  /**
+   * Get the current status of the cluster
+   * 
+   * @return summary of the state of the cluster
+   */
+  public ClusterMetrics getClusterMetrics() 
+  throws IOException, InterruptedException;
+  
+  /**
+   * Get JobTracker's state
+   * 
+   * @return {@link State} of the JobTracker
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public State getJobTrackerState() throws IOException, InterruptedException;
+  
+  public long getTaskTrackerExpiryInterval() throws IOException,
+                                               InterruptedException;
+  /**
+   * Kill the indicated job
+   */
+  public void killJob(JobID jobid) throws IOException, InterruptedException;
+
+  /**
+   * Set the priority of the specified job
+   * @param jobid ID of the job
+   * @param priority Priority to be set for the job
+   */
+  public void setJobPriority(JobID jobid, String priority) 
+  throws IOException, InterruptedException;
+  
+  /**
+   * Kill indicated task attempt.
+   * @param taskId the id of the task to kill.
+   * @param shouldFail if true the task is failed and added to failed tasks list, otherwise
+   * it is just killed, w/o affecting job failure status.  
+   */ 
+  public boolean killTask(TaskAttemptID taskId, boolean shouldFail) 
+  throws IOException, InterruptedException;
+  
+  /**
+   * Grab a handle to a job that is already known to the JobTracker.
+   * @return Status of the job, or null if not found.
+   */
+  public JobStatus getJobStatus(JobID jobid) 
+  throws IOException, InterruptedException;
+
+  /**
+   * Grab the current job counters
+   */
+  public Counters getJobCounters(JobID jobid) 
+  throws IOException, InterruptedException;
+    
+  /**
+   * Grab a bunch of info on the tasks that make up the job
+   */
+  public TaskReport[] getTaskReports(JobID jobid, TaskType type)
+  throws IOException, InterruptedException;
+
+  /**
+   * A MapReduce system always operates on a single filesystem.  This 
+   * function returns the fs name.  ('local' if the localfs; 'addr:port' 
+   * if dfs).  The client can then copy files into the right locations 
+   * prior to submitting the job.
+   */
+  public String getFilesystemName() throws IOException, InterruptedException;
+
+  /** 
+   * Get all the jobs submitted. 
+   * @return array of JobStatus for the submitted jobs
+   */
+  public JobStatus[] getAllJobs() throws IOException, InterruptedException;
+  
+  /**
+   * Get task completion events for the jobid, starting from fromEventId. 
+   * Returns empty aray if no events are available. 
+   * @param jobid job id 
+   * @param fromEventId event id to start from.
+   * @param maxEvents the max number of events we want to look at 
+   * @return array of task completion events. 
+   * @throws IOException
+   */
+  public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobid,
+    int fromEventId, int maxEvents) throws IOException, InterruptedException;
+    
+  /**
+   * Get the diagnostics for a given task in a given job
+   * @param taskId the id of the task
+   * @return an array of the diagnostic messages
+   */
+  public String[] getTaskDiagnostics(TaskAttemptID taskId) 
+  throws IOException, InterruptedException;
+
+  /** 
+   * Get all active trackers in cluster. 
+   * @return array of TaskTrackerInfo
+   */
+  public TaskTrackerInfo[] getActiveTrackers() 
+  throws IOException, InterruptedException;
+
+  /** 
+   * Get all blacklisted trackers in cluster. 
+   * @return array of TaskTrackerInfo
+   */
+  public TaskTrackerInfo[] getBlacklistedTrackers() 
+  throws IOException, InterruptedException;
+
+  /**
+   * Grab the jobtracker system directory path 
+   * where job-specific files are to be placed.
+   * 
+   * @return the system directory where job-specific files are to be placed.
+   */
+  public String getSystemDir() throws IOException, InterruptedException;  
+  
+  /**
+   * Gets set of Queues associated with the Job Tracker
+   * 
+   * @return Array of the Queue Information Object
+   * @throws IOException 
+   */
+  public QueueInfo[] getQueues() throws IOException, InterruptedException;
+  
+  /**
+   * Gets scheduling information associated with the particular Job queue
+   * 
+   * @param queueName Queue Name
+   * @return Scheduling Information of the Queue
+   * @throws IOException 
+   */
+  public QueueInfo getQueue(String queueName) 
+  throws IOException, InterruptedException;
+  
+  /**
+   * Gets the Queue ACLs for current user
+   * @return array of QueueAclsInfo object for current user.
+   * @throws IOException
+   */
+  public QueueAclsInfo[] getQueueAclsForCurrentUser() 
+  throws IOException, InterruptedException;
+  
+  /**
+   * Gets the root level queues.
+   * @return array of JobQueueInfo object.
+   * @throws IOException
+   */
+  public QueueInfo[] getRootQueues() throws IOException, InterruptedException;
+  
+  /**
+   * Returns immediate children of queueName.
+   * @param queueName
+   * @return array of JobQueueInfo which are children of queueName
+   * @throws IOException
+   */
+  public QueueInfo[] getChildQueues(String queueName) 
+  throws IOException, InterruptedException;
+
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/State.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/State.java?rev=816496&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/State.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/State.java Fri Sep 18 07:04:42 2009
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.server.jobtracker;
+
+/**
+ * Describes the state of JobTracker 
+ */
+public enum State {
+  INITIALIZING, RUNNING;
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/tools/CLI.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/tools/CLI.java?rev=816496&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/tools/CLI.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/tools/CLI.java Fri Sep 18 07:04:42 2009
@@ -0,0 +1,529 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.tools;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TIPStatus;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobPriority;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.TaskReport;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Interprets the map reduce cli options 
+ */
+public class CLI extends Configured implements Tool {
+  private static final Log LOG = LogFactory.getLog(CLI.class);
+
+  public CLI() {
+  }
+  
+  public CLI(Configuration conf) {
+    setConf(conf);
+  }
+  
+  public int run(String[] argv) throws Exception {
+    int exitCode = -1;
+    if (argv.length < 1) {
+      displayUsage("");
+      return exitCode;
+    }    
+    // process arguments
+    String cmd = argv[0];
+    String submitJobFile = null;
+    String jobid = null;
+    String taskid = null;
+    String historyFile = null;
+    String counterGroupName = null;
+    String counterName = null;
+    JobPriority jp = null;
+    String taskType = null;
+    String taskState = null;
+    int fromEvent = 0;
+    int nEvents = 0;
+    boolean getStatus = false;
+    boolean getCounter = false;
+    boolean killJob = false;
+    boolean listEvents = false;
+    boolean viewHistory = false;
+    boolean viewAllHistory = false;
+    boolean listJobs = false;
+    boolean listAllJobs = false;
+    boolean listActiveTrackers = false;
+    boolean listBlacklistedTrackers = false;
+    boolean displayTasks = false;
+    boolean killTask = false;
+    boolean failTask = false;
+    boolean setJobPriority = false;
+
+    if ("-submit".equals(cmd)) {
+      if (argv.length != 2) {
+        displayUsage(cmd);
+        return exitCode;
+      }
+      submitJobFile = argv[1];
+    } else if ("-status".equals(cmd)) {
+      if (argv.length != 2) {
+        displayUsage(cmd);
+        return exitCode;
+      }
+      jobid = argv[1];
+      getStatus = true;
+    } else if("-counter".equals(cmd)) {
+      if (argv.length != 4) {
+        displayUsage(cmd);
+        return exitCode;
+      }
+      getCounter = true;
+      jobid = argv[1];
+      counterGroupName = argv[2];
+      counterName = argv[3];
+    } else if ("-kill".equals(cmd)) {
+      if (argv.length != 2) {
+        displayUsage(cmd);
+        return exitCode;
+      }
+      jobid = argv[1];
+      killJob = true;
+    } else if ("-set-priority".equals(cmd)) {
+      if (argv.length != 3) {
+        displayUsage(cmd);
+        return exitCode;
+      }
+      jobid = argv[1];
+      try {
+        jp = JobPriority.valueOf(argv[2]); 
+      } catch (IllegalArgumentException iae) {
+        LOG.info(iae);
+        displayUsage(cmd);
+        return exitCode;
+      }
+      setJobPriority = true; 
+    } else if ("-events".equals(cmd)) {
+      if (argv.length != 4) {
+        displayUsage(cmd);
+        return exitCode;
+      }
+      jobid = argv[1];
+      fromEvent = Integer.parseInt(argv[2]);
+      nEvents = Integer.parseInt(argv[3]);
+      listEvents = true;
+    } else if ("-history".equals(cmd)) {
+      if (argv.length != 2 && !(argv.length == 3 && "all".equals(argv[1]))) {
+         displayUsage(cmd);
+         return exitCode;
+      }
+      viewHistory = true;
+      if (argv.length == 3 && "all".equals(argv[1])) {
+        viewAllHistory = true;
+        historyFile = argv[2];
+      } else {
+        historyFile = argv[1];
+      }
+    } else if ("-list".equals(cmd)) {
+      if (argv.length != 1 && !(argv.length == 2 && "all".equals(argv[1]))) {
+        displayUsage(cmd);
+        return exitCode;
+      }
+      if (argv.length == 2 && "all".equals(argv[1])) {
+        listAllJobs = true;
+      } else {
+        listJobs = true;
+      }
+    } else if("-kill-task".equals(cmd)) {
+      if (argv.length != 2) {
+        displayUsage(cmd);
+        return exitCode;
+      }
+      killTask = true;
+      taskid = argv[1];
+    } else if("-fail-task".equals(cmd)) {
+      if (argv.length != 2) {
+        displayUsage(cmd);
+        return exitCode;
+      }
+      failTask = true;
+      taskid = argv[1];
+    } else if ("-list-active-trackers".equals(cmd)) {
+      if (argv.length != 1) {
+        displayUsage(cmd);
+        return exitCode;
+      }
+      listActiveTrackers = true;
+    } else if ("-list-blacklisted-trackers".equals(cmd)) {
+      if (argv.length != 1) {
+        displayUsage(cmd);
+        return exitCode;
+      }
+      listBlacklistedTrackers = true;
+    } else if ("-list-attempt-ids".equals(cmd)) {
+      if (argv.length != 4) {
+        displayUsage(cmd);
+        return exitCode;
+      }
+      jobid = argv[1];
+      taskType = argv[2];
+      taskState = argv[3];
+      displayTasks = true;
+    } else {
+      displayUsage(cmd);
+      return exitCode;
+    }
+
+    // initialize cluster
+    Cluster cluster = new Cluster(getConf());
+        
+    // Submit the request
+    try {
+      if (submitJobFile != null) {
+        Job job = Job.getInstance(cluster, new JobConf(submitJobFile));
+        job.submit();
+        System.out.println("Created job " + job.getID());
+        exitCode = 0;
+      } else if (getStatus) {
+        Job job = cluster.getJob(JobID.forName(jobid));
+        if (job == null) {
+          System.out.println("Could not find job " + jobid);
+        } else {
+          System.out.println();
+          System.out.println(job);
+          Counters counters = job.getCounters();
+          if (counters != null) {
+            System.out.println(counters);
+          } else {
+            System.out.println("Counters not available. Job is retired.");
+          }
+          exitCode = 0;
+        }
+      } else if (getCounter) {
+        Job job = cluster.getJob(JobID.forName(jobid));
+        if (job == null) {
+          System.out.println("Could not find job " + jobid);
+        } else {
+          Counters counters = job.getCounters();
+          if (counters == null) {
+            System.out.println("Counters not available for retired job " + 
+            jobid);
+            exitCode = -1;
+          } else {
+            System.out.println(getCounter(counters,
+              counterGroupName, counterName));
+            exitCode = 0;
+          }
+        }
+      } else if (killJob) {
+        Job job = cluster.getJob(JobID.forName(jobid));
+        if (job == null) {
+          System.out.println("Could not find job " + jobid);
+        } else {
+          job.killJob();
+          System.out.println("Killed job " + jobid);
+          exitCode = 0;
+        }
+      } else if (setJobPriority) {
+        Job job = cluster.getJob(JobID.forName(jobid));
+        if (job == null) {
+          System.out.println("Could not find job " + jobid);
+        } else {
+          job.setPriority(jp);
+          System.out.println("Changed job priority.");
+          exitCode = 0;
+        } 
+      } else if (viewHistory) {
+        viewHistory(historyFile, viewAllHistory);
+        exitCode = 0;
+      } else if (listEvents) {
+        listEvents(cluster.getJob(JobID.forName(jobid)), fromEvent, nEvents);
+        exitCode = 0;
+      } else if (listJobs) {
+        listJobs(cluster);
+        exitCode = 0;
+      } else if (listAllJobs) {
+        listAllJobs(cluster);
+        exitCode = 0;
+      } else if (listActiveTrackers) {
+        listActiveTrackers(cluster);
+        exitCode = 0;
+      } else if (listBlacklistedTrackers) {
+        listBlacklistedTrackers(cluster);
+        exitCode = 0;
+      } else if (displayTasks) {
+        displayTasks(cluster.getJob(JobID.forName(jobid)), taskType, taskState);
+      } else if(killTask) {
+        TaskAttemptID taskID = TaskAttemptID.forName(taskid);
+        Job job = cluster.getJob(taskID.getJobID());
+        if (job == null) {
+          System.out.println("Could not find job " + jobid);
+        } else if (job.killTask(taskID)) {
+          System.out.println("Killed task " + taskid);
+          exitCode = 0;
+        } else {
+          System.out.println("Could not kill task " + taskid);
+          exitCode = -1;
+        }
+      } else if(failTask) {
+        TaskAttemptID taskID = TaskAttemptID.forName(taskid);
+        Job job = cluster.getJob(taskID.getJobID());
+        if (job == null) {
+            System.out.println("Could not find job " + jobid);
+        } else if(job.failTask(taskID)) {
+          System.out.println("Killed task " + taskID + " by failing it");
+          exitCode = 0;
+        } else {
+          System.out.println("Could not fail task " + taskid);
+          exitCode = -1;
+        }
+      }
+    } finally {
+      cluster.close();
+    }
+    return exitCode;
+  }
+
+  private String getJobPriorityNames() {
+    StringBuffer sb = new StringBuffer();
+    for (JobPriority p : JobPriority.values()) {
+      sb.append(p.name()).append(" ");
+    }
+    return sb.substring(0, sb.length()-1);
+  }
+
+  private String getTaskTypess() {
+    StringBuffer sb = new StringBuffer();
+    for (TaskType t : TaskType.values()) {
+      sb.append(t.name()).append(" ");
+    }
+    return sb.substring(0, sb.length()-1);
+  }
+
+  /**
+   * Display usage of the command-line tool and terminate execution.
+   */
+  private void displayUsage(String cmd) {
+    String prefix = "Usage: CLI ";
+    String jobPriorityValues = getJobPriorityNames();
+    String taskTypes = getTaskTypess();
+    String taskStates = "running, completed";
+    if ("-submit".equals(cmd)) {
+      System.err.println(prefix + "[" + cmd + " <job-file>]");
+    } else if ("-status".equals(cmd) || "-kill".equals(cmd)) {
+      System.err.println(prefix + "[" + cmd + " <job-id>]");
+    } else if ("-counter".equals(cmd)) {
+      System.err.println(prefix + "[" + cmd + 
+        " <job-id> <group-name> <counter-name>]");
+    } else if ("-events".equals(cmd)) {
+      System.err.println(prefix + "[" + cmd + 
+        " <job-id> <from-event-#> <#-of-events>]");
+    } else if ("-history".equals(cmd)) {
+      System.err.println(prefix + "[" + cmd + " <jobHistoryFile>]");
+    } else if ("-list".equals(cmd)) {
+      System.err.println(prefix + "[" + cmd + " [all]]");
+    } else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) {
+      System.err.println(prefix + "[" + cmd + " <task-attempt-id>]");
+    } else if ("-set-priority".equals(cmd)) {
+      System.err.println(prefix + "[" + cmd + " <job-id> <priority>]. " +
+          "Valid values for priorities are: " 
+          + jobPriorityValues); 
+    } else if ("-list-active-trackers".equals(cmd)) {
+      System.err.println(prefix + "[" + cmd + "]");
+    } else if ("-list-blacklisted-trackers".equals(cmd)) {
+      System.err.println(prefix + "[" + cmd + "]");
+    } else if ("-list-attempt-ids".equals(cmd)) {
+      System.err.println(prefix + "[" + cmd + 
+          " <job-id> <task-type> <task-state>]. " +
+          "Valid values for <task-type> are " + taskTypes + ". " +
+          "Valid values for <task-state> are " + taskStates);
+    } else {
+      System.err.printf(prefix + "<command> <args>\n");
+      System.err.printf("\t[-submit <job-file>]\n");
+      System.err.printf("\t[-status <job-id>]\n");
+      System.err.printf("\t[-counter <job-id> <group-name> <counter-name>]\n");
+      System.err.printf("\t[-kill <job-id>]\n");
+      System.err.printf("\t[-set-priority <job-id> <priority>]. " +
+        "Valid values for priorities are: " + jobPriorityValues + "\n");
+      System.err.printf("\t[-events <job-id> <from-event-#> <#-of-events>]\n");
+      System.err.printf("\t[-history <jobHistoryFile>]\n");
+      System.err.printf("\t[-list [all]]\n");
+      System.err.printf("\t[-list-active-trackers]\n");
+      System.err.printf("\t[-list-blacklisted-trackers]\n");
+      System.err.println("\t[-list-attempt-ids <job-id> <task-type> " +
+        "<task-state>]. " +
+        "Valid values for <task-type> are " + taskTypes + ". " +
+        "Valid values for <task-state> are " + taskStates);
+      System.err.printf("\t[-kill-task <task-attempt-id>]\n");
+      System.err.printf("\t[-fail-task <task-attempt-id>]\n\n");
+      ToolRunner.printGenericCommandUsage(System.out);
+    }
+  }
+    
+  private void viewHistory(String historyFile, boolean all) 
+    throws IOException {
+    HistoryViewer historyViewer = new HistoryViewer(historyFile,
+                                        getConf(), all);
+    historyViewer.print();
+  }
+
+  protected long getCounter(Counters counters, String counterGroupName,
+      String counterName) throws IOException {
+    return counters.findCounter(counterGroupName, counterName).getValue();
+  }
+  
+  /**
+   * List the events for the given job
+   * @param jobId the job id for the job's events to list
+   * @throws IOException
+   */
+  private void listEvents(Job job, int fromEventId, int numEvents)
+      throws IOException, InterruptedException {
+    TaskCompletionEvent[] events = job.
+      getTaskCompletionEvents(fromEventId, numEvents);
+    System.out.println("Task completion events for " + job.getID());
+    System.out.println("Number of events (from " + fromEventId + ") are: " 
+      + events.length);
+    for(TaskCompletionEvent event: events) {
+      System.out.println(event.getStatus() + " " + 
+        event.getTaskAttemptId() + " " + 
+        getTaskLogURL(event.getTaskAttemptId(), event.getTaskTrackerHttp()));
+    }
+  }
+
+  protected static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
+    return (baseUrl + "/tasklog?plaintext=true&taskid=" + taskId); 
+  }
+  
+
+  /**
+   * Dump a list of currently running jobs
+   * @throws IOException
+   */
+  private void listJobs(Cluster cluster) 
+      throws IOException, InterruptedException {
+    List<Job> runningJobs = new ArrayList<Job>();
+    for (Job job : cluster.getAllJobs()) {
+      if (!job.isComplete()) {
+        runningJobs.add(job);
+      }
+    }
+    displayJobList(runningJobs.toArray(new Job[0]));
+  }
+    
+  /**
+   * Dump a list of all jobs submitted.
+   * @throws IOException
+   */
+  private void listAllJobs(Cluster cluster) 
+      throws IOException, InterruptedException {
+    displayJobList(cluster.getAllJobs());
+  }
+  
+  /**
+   * Display the list of active trackers
+   */
+  private void listActiveTrackers(Cluster cluster) 
+      throws IOException, InterruptedException {
+    TaskTrackerInfo[] trackers = cluster.getActiveTaskTrackers();
+    for (TaskTrackerInfo tracker : trackers) {
+      System.out.println(tracker.getTaskTrackerName());
+    }
+  }
+
+  /**
+   * Display the list of blacklisted trackers
+   */
+  private void listBlacklistedTrackers(Cluster cluster) 
+      throws IOException, InterruptedException {
+    TaskTrackerInfo[] trackers = cluster.getBlackListedTaskTrackers();
+    if (trackers.length > 0) {
+      System.out.println("BlackListedNode \t Reason");
+    }
+    for (TaskTrackerInfo tracker : trackers) {
+      System.out.println(tracker.getTaskTrackerName() + "\t" + 
+        tracker.getReasonForBlacklist());
+    }
+  }
+
+  private void printTaskAttempts(TaskReport report) {
+    if (report.getCurrentStatus() == TIPStatus.COMPLETE) {
+      System.out.println(report.getSuccessfulTaskAttemptId());
+    } else if (report.getCurrentStatus() == TIPStatus.RUNNING) {
+      for (TaskAttemptID t : 
+        report.getRunningTaskAttemptIds()) {
+        System.out.println(t);
+      }
+    }
+  }
+
+  /**
+   * Display the information about a job's tasks, of a particular type and
+   * in a particular state
+   * 
+   * @param job the job
+   * @param type the type of the task (map/reduce/setup/cleanup)
+   * @param state the state of the task 
+   * (pending/running/completed/failed/killed)
+   */
+  protected void displayTasks(Job job, String type, String state) 
+  throws IOException, InterruptedException {
+    TaskReport[] reports = job.getTaskReports(TaskType.valueOf(type));
+    for (TaskReport report : reports) {
+      TIPStatus status = report.getCurrentStatus();
+      if ((state.equals("pending") && status ==TIPStatus.PENDING) ||
+          (state.equals("running") && status ==TIPStatus.RUNNING) ||
+          (state.equals("completed") && status == TIPStatus.COMPLETE) ||
+          (state.equals("failed") && status == TIPStatus.FAILED) ||
+          (state.equals("killed") && status == TIPStatus.KILLED)) {
+        printTaskAttempts(report);
+      }
+    }
+  }
+  
+  protected void displayJobList(Job[] jobs) 
+      throws IOException, InterruptedException {
+    System.out.println("Total jobs:" + jobs.length);
+    System.out.println("JobId\tState\tStartTime\t" +
+      "UserName\tPriority\tSchedulingInfo");
+    for (Job job : jobs) {
+      System.out.printf("%s\t%s\t%d\t%s\t%s\t%s\n", job.getID().toString(),
+        job.getJobState(), job.getStartTime(),
+        job.getUser(), job.getPriority().name(), job.getSchedulingInfo());
+    }
+  }
+  
+  public static void main(String[] argv) throws Exception {
+    int res = ToolRunner.run(new CLI(), argv);
+    System.exit(res);
+  }
+}

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java?rev=816496&r1=816495&r2=816496&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java Fri Sep 18 07:04:42 2009
@@ -29,6 +29,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.TaskStatus.Phase;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
@@ -71,7 +72,7 @@
   }
 
   static class FakeJobInProgress extends JobInProgress {
-    JobClient.RawSplit[] rawSplits;
+    Job.RawSplit[] rawSplits;
     @SuppressWarnings("deprecation")
     FakeJobInProgress(JobConf jobConf, JobTracker tracker) throws IOException {
       super(new JobID(jtIdentifier, ++jobCounter), jobConf, tracker);
@@ -85,7 +86,7 @@
     @Override
     public synchronized void initTasks() throws IOException {
      
-      JobClient.RawSplit[] splits = createSplits();
+      Job.RawSplit[] splits = createSplits();
       numMapTasks = splits.length;
       createMapTasks(null, splits);
       nonRunningMapCache = createCache(splits, maxLevel);
@@ -95,17 +96,17 @@
     }
     
     @Override
-    JobClient.RawSplit[] createSplits(){
-      JobClient.RawSplit[] splits = new JobClient.RawSplit[numMapTasks];
+    Job.RawSplit[] createSplits(){
+      Job.RawSplit[] splits = new Job.RawSplit[numMapTasks];
       for (int i = 0; i < numMapTasks; i++) {
-        splits[i] = new JobClient.RawSplit();
+        splits[i] = new Job.RawSplit();
         splits[i].setLocations(new String[0]);
       }
       return splits;
     }
     
     @Override
-    protected void createMapTasks(String ignored, JobClient.RawSplit[] splits) {
+    protected void createMapTasks(String ignored, Job.RawSplit[] splits) {
       maps = new TaskInProgress[numMapTasks];
       for (int i = 0; i < numMapTasks; i++) {
         maps[i] = new TaskInProgress(getJobID(), "test", 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobClient.java?rev=816496&r1=816495&r2=816496&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobClient.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobClient.java Fri Sep 18 07:04:42 2009
@@ -17,32 +17,24 @@
  */
 package org.apache.hadoop.mapred;
 
-import java.io.BufferedReader;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
-import java.io.PrintStream;
 import java.io.Writer;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.TestMRJobClient;
+import org.apache.hadoop.mapreduce.tools.CLI;
 import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
 
-public class TestJobClient extends ClusterMapReduceTestCase {
-  
-  private static final Log LOG = LogFactory.getLog(TestJobClient.class);
+public class TestJobClient extends TestMRJobClient {
   
   private String runJob() throws Exception {
-    OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt"));
+    OutputStream os = getFileSystem().create(new Path(getInputDir(),
+                        "text.txt"));
     Writer wr = new OutputStreamWriter(os);
     wr.write("hello1\n");
     wr.write("hello2\n");
@@ -71,60 +63,27 @@
     return JobClient.runJob(conf).getID().toString();
   }
   
-  static int runTool(Configuration conf, Tool tool, String[] args, OutputStream out) throws Exception {
-    PrintStream oldOut = System.out;
-    PrintStream newOut = new PrintStream(out, true);
-    try {
-      System.setOut(newOut);
-      return ToolRunner.run(conf, tool, args);
-    } finally {
-      System.setOut(oldOut);
-    }
+  public static int runTool(Configuration conf, Tool tool, String[] args,
+      OutputStream out) throws Exception {
+    return TestMRJobClient.runTool(conf, tool, args, out);
   }
-
-  public void testGetCounter() throws Exception {
-    String jobId = runJob();
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    int exitCode = runTool(createJobConf(), new JobClient(),
-        new String[] { "-counter", jobId,
-        "org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS" },
-        out);
-    assertEquals("Exit code", 0, exitCode);
-    assertEquals("Counter", "3", out.toString().trim());
+  
+  static void verifyJobPriority(String jobId, String priority,
+      JobConf conf)  throws Exception {
+    TestJobClient test = new TestJobClient();
+    test.verifyJobPriority(jobId, priority, conf, test.createJobClient());
   }
-
-  public void testJobList() throws Exception {
+  
+  public void testJobClient() throws Exception {
+    Configuration conf = createJobConf();
     String jobId = runJob();
-    verifyJobPriority(jobId, "HIGH", createJobConf());
-  }
-
-  static void verifyJobPriority(String jobId, String priority, JobConf conf)
-                            throws Exception {
-    PipedInputStream pis = new PipedInputStream();
-    PipedOutputStream pos = new PipedOutputStream(pis);
-    int exitCode = runTool(conf, new JobClient(),
-        new String[] { "-list", "all" },
-        pos);
-    assertEquals("Exit code", 0, exitCode);
-    BufferedReader br = new BufferedReader(new InputStreamReader(pis));
-    String line = null;
-    while ((line=br.readLine()) != null) {
-      LOG.info("line = " + line);
-      if (!line.startsWith(jobId)) {
-        continue;
-      }
-      assertTrue(line.contains(priority));
-      break;
-    }
-    pis.close();
+    testGetCounter(jobId, conf);
+    testJobList(jobId, conf);
+    testChangingJobPriority(jobId, conf);
   }
   
-  public void testChangingJobPriority() throws Exception {
-    String jobId = runJob();
-    int exitCode = runTool(createJobConf(), new JobClient(),
-        new String[] { "-set-priority", jobId, "VERY_LOW" },
-        new ByteArrayOutputStream());
-    assertEquals("Exit code", 0, exitCode);
-    verifyJobPriority(jobId, "VERY_LOW", createJobConf());
+  protected CLI createJobClient() 
+      throws IOException {
+    return new JobClient();
   }
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java?rev=816496&r1=816495&r2=816496&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java Fri Sep 18 07:04:42 2009
@@ -38,10 +38,11 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress;
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
-import org.apache.hadoop.mapred.JobClient.RawSplit;
 import org.apache.hadoop.mapred.TaskStatus.Phase;
 import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.Job.RawSplit;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.StaticMapping;
@@ -99,14 +100,14 @@
     }
 
     @Override
-    JobClient.RawSplit[] createSplits() {
+    Job.RawSplit[] createSplits() {
       // Set all splits to reside on one host. This will ensure that 
       // one tracker gets data local, one gets rack local and two others
       // get non-local maps
-      RawSplit[] splits = new RawSplit[numMapTasks];
+      Job.RawSplit[] splits = new Job.RawSplit[numMapTasks];
       String[] splitHosts0 = new String[] { hosts[0] };
       for (int i = 0; i < numMapTasks; i++) {
-        splits[i] = new RawSplit();
+        splits[i] = new Job.RawSplit();
         splits[i].setDataLength(0);
         splits[i].setLocations(splitHosts0);
       }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java?rev=816496&r1=816495&r2=816496&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java Fri Sep 18 07:04:42 2009
@@ -27,9 +27,10 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Job.RawSplit;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.apache.hadoop.mapred.JobClient.RawSplit;
 
 /**
  *  Validates map phase progress.
@@ -139,7 +140,7 @@
     jobId = taskId.getJobID();
     
     JobContext jContext = new JobContext(job, jobId);
-    RawSplit[] rawSplits = LocalJobRunner.getRawSplits(jContext, job);
+    Job.RawSplit[] rawSplits = LocalJobRunner.getRawSplits(jContext, job);
 
     job.setUseNewMapper(true); // use new api
     for (int i = 0; i < rawSplits.length; i++) {// rawSplits.length is 1

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java?rev=816496&r1=816495&r2=816496&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java Fri Sep 18 07:04:42 2009
@@ -28,9 +28,10 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobHistory;
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
-import org.apache.hadoop.mapred.JobClient.RawSplit;
 import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.Job.RawSplit;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.StaticMapping;
 
@@ -91,7 +92,7 @@
 
     @Override
     public void initTasks() throws IOException {
-      JobClient.RawSplit[] splits = createSplits();
+      Job.RawSplit[] splits = createSplits();
       numMapTasks = splits.length;
       createMapTasks(null, splits);
       nonRunningMapCache = createCache(splits, maxLevel);
@@ -101,8 +102,8 @@
     }
   
 
-    protected JobClient.RawSplit[] createSplits() throws IOException {
-      RawSplit[] splits = new RawSplit[numMaps];
+    protected Job.RawSplit[] createSplits() throws IOException {
+      Job.RawSplit[] splits = new Job.RawSplit[numMaps];
       // Hand code for now. 
       // M0,2,3 reside in Host1
       // M1 resides in Host3
@@ -110,7 +111,7 @@
       String[] splitHosts0 = new String[] { allHosts[0] };
 
       for (int i = 0; i < numMaps; i++) {
-        splits[i] = new RawSplit();
+        splits[i] = new Job.RawSplit();
         splits[i].setDataLength(0);
       }
 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java?rev=816496&r1=816495&r2=816496&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java Fri Sep 18 07:04:42 2009
@@ -18,7 +18,9 @@
 package org.apache.hadoop.mapred;
 
 import junit.framework.TestCase;
-import org.apache.hadoop.mapred.JobClient.RawSplit;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Job.RawSplit;
 
 public class TestResourceEstimation extends TestCase {
   
@@ -44,7 +46,7 @@
       
       TaskStatus ts = new MapTaskStatus();
       ts.setOutputSize(singleMapOutputSize);
-      RawSplit split = new RawSplit();
+      Job.RawSplit split = new Job.RawSplit();
       split.setDataLength(0);
       TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0, 1);
       re.updateWithCompletedTask(ts, tip);
@@ -79,7 +81,7 @@
       
       TaskStatus ts = new MapTaskStatus();
       ts.setOutputSize(singleMapOutputSize);
-      RawSplit split = new RawSplit();
+      Job.RawSplit split = new Job.RawSplit();
       split.setDataLength(singleMapInputSize);
       TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0, 1);
       re.updateWithCompletedTask(ts, tip);
@@ -91,7 +93,7 @@
     //add one more map task with input size as 0
     TaskStatus ts = new MapTaskStatus();
     ts.setOutputSize(singleMapOutputSize);
-    RawSplit split = new RawSplit();
+    Job.RawSplit split = new Job.RawSplit();
     split.setDataLength(0);
     TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0, 1);
     re.updateWithCompletedTask(ts, tip);

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java?rev=816496&r1=816495&r2=816496&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java Fri Sep 18 07:04:42 2009
@@ -23,6 +23,7 @@
 
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress;
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
@@ -61,7 +62,7 @@
     @Override
     public synchronized void initTasks() throws IOException {
       super.initTasks();
-      JobClient.RawSplit emptySplit = new JobClient.RawSplit();
+      Job.RawSplit emptySplit = new Job.RawSplit();
       setup = new TaskInProgress[2];
       setup[0] = new TaskInProgress(getJobID(), "test",  emptySplit,
           jobtracker, getJobConf(), this, numMapTasks + 1, 1);

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java?rev=816496&r1=816495&r2=816496&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java Fri Sep 18 07:04:42 2009
@@ -376,8 +376,6 @@
 
     conf.set("hadoop.job.history.user.location", histDir.toString());
 
-    conf.setNumReduceTasks(numReduces);
-
     FileInputFormat.setInputPaths(conf, inDir);
     FileOutputFormat.setOutputPath(conf, outDir);
 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java?rev=816496&r1=816495&r2=816496&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java Fri Sep 18 07:04:42 2009
@@ -30,6 +30,7 @@
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskType;
 
 public class TestTaskFail extends TestCase {
@@ -179,7 +180,7 @@
       JobConf jobConf = mr.createJobConf();
       // turn down the completion poll interval from the 5 second default
       // for better test performance.
-      jobConf.set(JobClient.NetworkedJob.COMPLETION_POLL_INTERVAL_KEY, "50");
+      jobConf.set(Job.COMPLETION_POLL_INTERVAL_KEY, "50");
       jobConf.setOutputCommitter(CommitterWithLogs.class);
       RunningJob rJob = launchJob(jobConf, inDir, outDir, input);
       rJob.waitForCompletion();

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=816496&r1=816495&r2=816496&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Fri Sep 18 07:04:42 2009
@@ -28,10 +28,12 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -127,17 +129,16 @@
     trackerFConf.setStrings("mapred.local.dir", localDirs);
 
     // Create the job configuration file. Same as trackerConf in this test.
-    JobConf jobConf = trackerFConf;
+    Job job = new Job(trackerFConf);
 
-    // JobClient sets the job credentials.
-    new JobClient().setUGIAndUserGroupNames(jobConf);
+    job.setUGIAndUserGroupNames();
 
     // JobClient uploads the job jar to the file system and sets it in the
     // jobConf.
-    uploadJobJar(jobConf);
+    uploadJobJar(job);
 
     // JobClient uploads the jobConf to the file system.
-    File jobConfFile = uploadJobConf(jobConf);
+    File jobConfFile = uploadJobConf(job.getConfiguration());
 
     // Set up the TaskTracker
     tracker = new TaskTracker();
@@ -155,7 +156,7 @@
         new TaskAttemptID(jtIdentifier, jobId.getId(), TaskType.MAP, 1, 0);
     task =
         new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, null, 1);
-    task.setConf(jobConf); // Set conf. Set user name in particular.
+    task.setConf(job.getConfiguration()); // Set conf. Set user name in particular.
 
     taskController = new DefaultTaskController();
     taskController.setConf(trackerFConf);
@@ -166,11 +167,11 @@
   }
 
   /**
-   * @param jobConf
+   * @param job
    * @throws IOException
    * @throws FileNotFoundException
    */
-  private void uploadJobJar(JobConf jobConf)
+  private void uploadJobJar(Job job)
       throws IOException,
       FileNotFoundException {
     File jobJarFile = new File(TEST_ROOT_DIR, "jobjar-on-dfs.jar");
@@ -184,21 +185,21 @@
     jstream.closeEntry();
     jstream.finish();
     jstream.close();
-    jobConf.setJar(jobJarFile.toURI().toString());
+    job.setJar(jobJarFile.toURI().toString());
   }
 
   /**
-   * @param jobConf
+   * @param conf
    * @return
    * @throws FileNotFoundException
    * @throws IOException
    */
-  protected File uploadJobConf(JobConf jobConf)
+  protected File uploadJobConf(Configuration conf)
       throws FileNotFoundException,
       IOException {
     File jobConfFile = new File(TEST_ROOT_DIR, "jobconf-on-dfs.xml");
     FileOutputStream out = new FileOutputStream(jobConfFile);
-    jobConf.writeXml(out);
+    conf.writeXml(out);
     out.close();
     return jobConfFile;
   }

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMRJobClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMRJobClient.java?rev=816496&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMRJobClient.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMRJobClient.java Fri Sep 18 07:04:42 2009
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.io.PrintStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.ClusterMapReduceTestCase;
+import org.apache.hadoop.mapreduce.tools.CLI;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class TestMRJobClient extends ClusterMapReduceTestCase {
+  
+  private static final Log LOG = LogFactory.getLog(TestMRJobClient.class);
+  
+  private Job runJob(Configuration conf) throws Exception {
+    String input = "hello1\nhello2\nhello3\n";
+
+    Job job = MapReduceTestUtil.createJob(conf,
+      getInputDir(), getOutputDir(), 1, 1, input);
+    job.setJobName("mr");
+    job.setPriority(JobPriority.HIGH);
+    job.waitForCompletion(true);
+    return job;
+  }
+  
+  public static int runTool(Configuration conf, Tool tool,
+      String[] args, OutputStream out) throws Exception {
+    PrintStream oldOut = System.out;
+    PrintStream newOut = new PrintStream(out, true);
+    try {
+      System.setOut(newOut);
+      return ToolRunner.run(conf, tool, args);
+    } finally {
+      System.setOut(oldOut);
+    }
+  }
+
+  public void testJobClient() throws Exception {
+    Configuration conf = createJobConf();
+    Job job = runJob(conf);
+    String jobId = job.getID().toString();
+    testGetCounter(jobId, conf);
+    testJobList(jobId, conf);
+    testChangingJobPriority(jobId, conf);
+  }
+  
+  public void testGetCounter(String jobId,
+      Configuration conf) throws Exception {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    int exitCode = runTool(conf, createJobClient(),
+        new String[] { "-counter", jobId,
+        "org.apache.hadoop.mapreduce.TaskCounter", "MAP_INPUT_RECORDS" },
+        out);
+    assertEquals("Exit code", 0, exitCode);
+    assertEquals("Counter", "3", out.toString().trim());
+  }
+
+  public void testJobList(String jobId,
+      Configuration conf) throws Exception {
+    verifyJobPriority(jobId, "HIGH", conf, createJobClient());
+  }
+
+  protected void verifyJobPriority(String jobId, String priority,
+      Configuration conf, CLI jc) throws Exception {
+    PipedInputStream pis = new PipedInputStream();
+    PipedOutputStream pos = new PipedOutputStream(pis);
+    int exitCode = runTool(conf, jc,
+        new String[] { "-list", "all" },
+        pos);
+    assertEquals("Exit code", 0, exitCode);
+    BufferedReader br = new BufferedReader(new InputStreamReader(pis));
+    String line = null;
+    while ((line = br.readLine()) != null) {
+      LOG.info("line = " + line);
+      if (!line.startsWith(jobId)) {
+        continue;
+      }
+      assertTrue(line.contains(priority));
+      break;
+    }
+    pis.close();
+  }
+  
+  public void testChangingJobPriority(String jobId, Configuration conf)
+      throws Exception {
+    int exitCode = runTool(conf, createJobClient(),
+        new String[] { "-set-priority", jobId, "VERY_LOW" },
+        new ByteArrayOutputStream());
+    assertEquals("Exit code", 0, exitCode);
+    verifyJobPriority(jobId, "VERY_LOW", conf, createJobClient());
+  }
+  
+  protected CLI createJobClient() throws IOException {
+    return new CLI();
+  }
+
+}

Modified: hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp?rev=816496&r1=816495&r2=816496&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp (original)
+++ hadoop/mapreduce/trunk/src/webapps/job/jobtracker.jsp Fri Sep 18 07:04:42 2009
@@ -34,7 +34,7 @@
   ClusterStatus status = tracker.getClusterStatus();
   String trackerName = 
            StringUtils.simpleHostname(tracker.getJobTrackerMachine());
-  JobQueueInfo[] queues = tracker.getQueues();
+  JobQueueInfo[] queues = tracker.getJobQueues();
   Vector<JobInProgress> runningJobs = tracker.runningJobs();
   Vector<JobInProgress> completedJobs = tracker.completedJobs();
   Vector<JobInProgress> failedJobs = tracker.failedJobs();

Modified: hadoop/mapreduce/trunk/src/webapps/job/queuetable.jsp
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/webapps/job/queuetable.jsp?rev=816496&r1=816495&r2=816496&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/webapps/job/queuetable.jsp (original)
+++ hadoop/mapreduce/trunk/src/webapps/job/queuetable.jsp Fri Sep 18 07:04:42 2009
@@ -36,7 +36,7 @@
 <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
 <%
 JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
-JobQueueInfo[] queues = tracker.getRootQueues();
+JobQueueInfo[] queues = tracker.getRootJobQueues();
 %>
 <title>Queue Information</title>
 </head>



Mime
View raw message