incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1021240 - in /incubator/hama/trunk: CHANGES.txt bin/hama src/java/org/apache/hama/bsp/BSPJobClient.java src/java/org/apache/hama/bsp/BSPMaster.java src/java/org/apache/hama/bsp/ClusterStatus.java src/java/org/apache/hama/bsp/JobStatus.java
Date Mon, 11 Oct 2010 05:15:48 GMT
Author: edwardyoon
Date: Mon Oct 11 05:15:48 2010
New Revision: 1021240

URL: http://svn.apache.org/viewvc?rev=1021240&view=rev
Log:
Add command-line interface for job management

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/bin/hama
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/ClusterStatus.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1021240&r1=1021239&r2=1021240&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Mon Oct 11 05:15:48 2010
@@ -4,6 +4,7 @@ Trunk (unreleased changes)
 
   NEW FEATURES
 
+    HAMA-267: Add command-line interface for job management (hyunsik via edwardyoon)
     HAMA-279: Add "serialize printing" to examples (edwardyoon)
     HAMA-272: Hama/Zookeeper Integration (edwardyoon)
     HAMA-265: Add example Pi estimatior based on BSP (edwardyoon)

Modified: incubator/hama/trunk/bin/hama
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/bin/hama?rev=1021240&r1=1021239&r2=1021240&view=diff
==============================================================================
--- incubator/hama/trunk/bin/hama (original)
+++ incubator/hama/trunk/bin/hama Mon Oct 11 05:15:48 2010
@@ -58,6 +58,7 @@ if [ $# = 0 ]; then
   echo "  bspmaster            run the BSP Master node"
   echo "  groom                run the Groom node"
   echo "  zookeeper            run a Zookeeper server"
+  echo "  job                  manipulate BSP jobs"
   echo "  jar <jar>            run a jar file"
   echo " or"
   echo "  CLASSNAME            run the class named CLASSNAME"
@@ -162,6 +163,8 @@ elif [ "$COMMAND" = "groom" ] ; then
   BSP_OPTS="$BSP_OPTS $BSP_GROOMSERVER_OPTS"
 elif [ "$COMMAND" = "zookeeper" ] ; then
   CLASS='org.apache.hama.ZooKeeperRunner'
+elif [ "$COMMAND" = "job" ] ; then
+  CLASS='org.apache.hama.bsp.BSPJobClient'
 elif [ "$COMMAND" = "jar" ] ; then
   CLASS=org.apache.hama.util.RunJar
   BSP_OPTS="$BSP_OPTS"

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java?rev=1021240&r1=1021239&r2=1021240&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java Mon Oct 11 05:15:48
2010
@@ -19,6 +19,7 @@ package org.apache.hama.bsp;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.Collection;
 
 import javax.security.auth.login.LoginException;
 
@@ -34,9 +35,12 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hama.HamaConfiguration;
 import org.apache.hama.ipc.JobSubmissionProtocol;
 
-public class BSPJobClient extends Configured {
+public class BSPJobClient extends Configured implements Tool {
   private static final Log LOG = LogFactory.getLog(BSPJobClient.class);
 
   public static enum TaskStatusFilter {
@@ -177,6 +181,9 @@ public class BSPJobClient extends Config
     init(conf);
   }
 
+  public BSPJobClient() {
+  }
+
   public void init(Configuration conf) throws IOException {
     // it will be used to determine if the bspmaster is running on local or not.
     String master = conf.get("bsp.master.address", "local");
@@ -211,6 +218,26 @@ public class BSPJobClient extends Config
     return fs;
   }
 
+  /**
+   * Gets the jobs that are submitted.
+   * 
+   * @return array of {@link JobStatus} for the submitted jobs.
+   * @throws IOException
+   */
+  public JobStatus[] getAllJobs() throws IOException {
+    return jobSubmitClient.getAllJobs();
+  }
+
+  /**
+   * Gets the jobs that are not completed and not failed.
+   * 
+   * @return array of {@link JobStatus} for the running/to-be-run jobs.
+   * @throws IOException
+   */
+  public JobStatus[] jobsToComplete() throws IOException {
+    return jobSubmitClient.jobsToComplete();
+  }
+
   private UnixUserGroupInformation getUGI(Configuration conf)
       throws IOException {
     UnixUserGroupInformation ugi = null;
@@ -369,18 +396,171 @@ public class BSPJobClient extends Config
     }
 
     // TODO if error found, kill job
-    //running.killJob();
+    // running.killJob();
     jc.close();
   }
 
   /**
    * Get status information about the BSP cluster
    * 
+   * @param detailed if true then get a detailed status including the
+   *          groomserver names
+   * 
+   * @return the status information about the BSP cluster as an object of
+   *         {@link ClusterStatus}.
+   * 
+   * @throws IOException
+   */
+  public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
+    return jobSubmitClient.getClusterStatus(detailed);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    int exitCode = -1;
+    if (args.length < 1) {
+      displayUsage("");
+      return exitCode;
+    }
+
+    // process arguments
+    String cmd = args[0];
+    boolean listJobs = false;
+    boolean listAllJobs = false;
+    boolean listActiveTrackers = false;
+
+    HamaConfiguration conf = new HamaConfiguration(getConf());
+    init(conf);
+
+    if ("-list".equals(cmd)) {
+      if (args.length != 1 && !(args.length == 2 && "all".equals(args[1])))
{
+        displayUsage(cmd);
+        return exitCode;
+      }
+      if (args.length == 2 && "all".equals(args[1])) {
+        listAllJobs = true;
+      } else {
+        listJobs = true;
+      }
+    } else if ("-list-active-grooms".equals(cmd)) {
+      if (args.length != 1) {
+        displayUsage(cmd);
+        return exitCode;
+      }
+      listActiveTrackers = true;
+    }
+
+    if (listJobs) {
+      listJobs();
+      exitCode = 0;
+    } else if (listAllJobs) {
+      listAllJobs();
+      exitCode = 0;
+    } else if (listActiveTrackers) {
+      listActiveTrackers();
+      exitCode = 0;
+    }
+
+    return 0;
+  }
+
+  /**
+   * Display usage of the command-line tool and terminate execution
+   */
+  private void displayUsage(String cmd) {
+    String prefix = "Usage: JobClient ";
+    String taskStates = "running, completed";
+    if ("-submit".equals(cmd)) {
+      System.err.println(prefix + "[" + cmd + " <job-file>]");
+    } else if ("-status".equals(cmd) || "-kill".equals(cmd)) {
+      System.err.println(prefix + "[" + cmd + " <job-id>]");
+    } else if ("-counter".equals(cmd)) {
+      System.err.println(prefix + "[" + cmd
+          + " <job-id> <group-name> <counter-name>]");
+    } else if ("-events".equals(cmd)) {
+      System.err.println(prefix + "[" + cmd
+          + " <job-id> <from-event-#> <#-of-events>]");
+    } else if ("-history".equals(cmd)) {
+      System.err.println(prefix + "[" + cmd + " <jobOutputDir>]");
+    } else if ("-list".equals(cmd)) {
+      System.err.println(prefix + "[" + cmd + " [all]]");
+    } else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) {
+      System.err.println(prefix + "[" + cmd + " <task-id>]");
+    } else if ("-list-active-grooms".equals(cmd)) {
+      System.err.println(prefix + "[" + cmd + "]");
+    } else if ("-list-attempt-ids".equals(cmd)) {
+      System.err.println(prefix + "[" + cmd
+          + " <job-id> <task-type> <task-state>]. "
+          + "Valid values for <task-state> are " + taskStates);
+    } else {
+      System.err.printf(prefix + "<command> <args>\n");
+      System.err.printf("\t[-submit <job-file>]\n");
+      System.err.printf("\t[-status <job-id>]\n");
+      System.err.printf("\t[-counter <job-id> <group-name> <counter-name>]\n");
+      System.err.printf("\t[-kill <job-id>]\n");
+      System.err.printf("\t[-events <job-id> <from-event-#> <#-of-events>]\n");
+      System.err.printf("\t[-history <jobOutputDir>]\n");
+      System.err.printf("\t[-list [all]]\n");
+      System.err.printf("\t[-list-active-grooms]\n");
+      System.err.println("\t[-list-attempt-ids <job-id> <task-type> "
+          + "<task-state>]\n");
+      System.err.printf("\t[-kill-task <task-id>]\n");
+      System.err.printf("\t[-fail-task <task-id>]\n\n");
+    }
+  }
+
+  /**
+   * Dump a list of currently running jobs
+   * 
+   * @throws IOException
+   */
+  private void listJobs() throws IOException {
+    JobStatus[] jobs = jobsToComplete();
+    if (jobs == null)
+      jobs = new JobStatus[0];
+
+    System.out.printf("%d jobs currently running\n", jobs.length);
+    displayJobList(jobs);
+  }
+
+  /**
+   * Dump a list of all jobs submitted.
+   * 
    * @throws IOException
    */
-  public ClusterStatus getClusterStatus() throws IOException {
-    // TODO:
+  private void listAllJobs() throws IOException {
+    JobStatus[] jobs = getAllJobs();
+    if (jobs == null)
+      jobs = new JobStatus[0];
+    System.out.printf("%d jobs submitted\n", jobs.length);
+    System.out.printf("States are:\n\tRunning : 1\tSucceded : 2"
+        + "\tFailed : 3\tPrep : 4\n");
+    displayJobList(jobs);
+  }
+
+  void displayJobList(JobStatus[] jobs) {
+    System.out.printf("JobId\tState\tStartTime\tUserName\n");
+    for (JobStatus job : jobs) {
+      System.out.printf("%s\t%d\t%d\t%s\n", job.getJobID(), job.getRunState(),
+          job.getStartTime(), job.getUsername());
+    }
+  }
 
-    return null;
+  /**
+   * Display the list of active trackers
+   */
+  private void listActiveTrackers() throws IOException {
+    ClusterStatus c = jobSubmitClient.getClusterStatus(true);
+    Collection<String> trackers = c.getActiveGroomNames();
+    for (String trackerName : trackers) {
+      System.out.println(trackerName);
+    }
+  }
+
+  /**
+   */
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new BSPJobClient(), args);
+    System.exit(res);
   }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java?rev=1021240&r1=1021239&r2=1021240&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java Mon Oct 11 05:15:48 2010
@@ -571,6 +571,7 @@ public class BSPMaster implements JobSub
 
   @Override
   public JobStatus[] getAllJobs() throws IOException {
+    LOG.debug("returns all jobs: " + jobs.size());
     return getJobStatus(jobs.values(), false);
   }
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/ClusterStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/ClusterStatus.java?rev=1021240&r1=1021239&r2=1021240&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/ClusterStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/ClusterStatus.java Mon Oct 11 05:15:48
2010
@@ -50,7 +50,7 @@ import org.apache.hadoop.io.WritableUtil
  * </ol></p>
  * 
  * <p>Clients can query for the latest <code>ClusterStatus</code>, via

- * {@link BSPJobClient#getClusterStatus()}.</p>
+ * {@link BSPJobClient#getClusterStatus(boolean)}.</p>
  * 
  * @see BSPMaster
  */

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java?rev=1021240&r1=1021239&r2=1021240&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java Mon Oct 11 05:15:48 2010
@@ -50,6 +50,7 @@ public class JobStatus implements Writab
   private int runState;
   private long startTime;
   private String schedulingInfo = "NA";
+  private String user;
 
   public JobStatus() {
   }
@@ -116,6 +117,20 @@ public class JobStatus implements Writab
     return startTime;
   }
 
+  /**
+   * @param user The username of the job
+   */
+  synchronized void setUsername(String userName) {
+    this.user = userName;
+  }
+
+  /**
+   * @return the username of the job
+   */
+  public synchronized String getUsername() {
+    return this.user;
+  }
+
   @Override
   public Object clone() {
     try {



Mime
View raw message