incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1041693 - in /incubator/hama/trunk: ./ src/java/org/apache/hama/bsp/
Date Fri, 03 Dec 2010 04:58:51 GMT
Author: edwardyoon
Date: Fri Dec  3 04:58:51 2010
New Revision: 1041693

URL: http://svn.apache.org/viewvc?rev=1041693&view=rev
Log:
Removing JobInProgressListener and adding JobInitThread to BSPMaster

Removed:
    incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgressListener.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobID.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerManager.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1041693&r1=1041692&r2=1041693&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Fri Dec  3 04:58:51 2010
@@ -50,6 +50,8 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
     
+    HAMA-331: Removing JobInProgressListener 
+                       and adding JobInitThread to BSPMaster (edwardyoon)
     HAMA-334: Removing "java5.home" env key from build script. (edwardyoon)
     HAMA-332: Renaming some members in BSPMaster 
                 to follow the Hama terminology (Filipe Manana via edwardyoon)

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java?rev=1041693&r1=1041692&r2=1041693&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java Fri Dec  3 04:58:51
2010
@@ -460,7 +460,9 @@ public class BSPJobClient extends Config
     boolean listJobs = false;
     boolean listAllJobs = false;
     boolean listActiveTrackers = false;
-
+    boolean killJob = false;
+    String jobid = null;
+    
     HamaConfiguration conf = new HamaConfiguration(getConf());
     init(conf);
 
@@ -480,8 +482,16 @@ public class BSPJobClient extends Config
         return exitCode;
       }
       listActiveTrackers = true;
+    } else if ("-kill".equals(cmd)) {
+      if (args.length == 1) {
+        displayUsage(cmd);
+        return exitCode;
+      }
+      killJob = true;
+      jobid = args[1];
     }
 
+    BSPJobClient jc = new BSPJobClient(new HamaConfiguration());
     if (listJobs) {
       listJobs();
       exitCode = 0;
@@ -491,6 +501,15 @@ public class BSPJobClient extends Config
     } else if (listActiveTrackers) {
       listActiveTrackers();
       exitCode = 0;
+    } else if (killJob) {
+      RunningJob job = jc.getJob(new BSPJobID().forName(jobid));
+      if (job == null) {
+          System.out.println("Could not find job " + jobid);
+      } else {
+          job.killJob();
+          System.out.println("Killed job " + jobid);
+      }
+      exitCode = 0;
     }
 
     return 0;

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobID.java?rev=1041693&r1=1041692&r2=1041693&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobID.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobID.java Fri Dec  3 04:58:51 2010
@@ -97,7 +97,7 @@ public class BSPJobID extends ID impleme
     jtIdentifier.write(out);
   }
 
-  public static BSPJobID forName(String str) throws IllegalArgumentException {
+  public BSPJobID forName(String str) throws IllegalArgumentException {
     if (str == null)
       return null;
     try {

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java?rev=1041693&r1=1041692&r2=1041693&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java Fri Dec  3 04:58:51 2010
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.Vector;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -65,7 +66,8 @@ public class BSPMaster implements JobSub
 
   private static final int FS_ACCESS_RETRY_PERIOD = 10000;
   public static final long GROOMSERVER_EXPIRY_INTERVAL = 10 * 60 * 1000;
-
+  static long JOBINIT_SLEEP_INTERVAL = 2000;
+  
   // States
   State state = State.INITIALIZING;
 
@@ -104,6 +106,9 @@ public class BSPMaster implements JobSub
   TreeMap<String, TreeSet<String>> groomNameToTaskIdsMap = new TreeMap<String,
TreeSet<String>>();
   Map<String, TaskInProgress> taskIdToTaskInProgressMap = new TreeMap<String, TaskInProgress>();
 
+  Vector<JobInProgress> jobInitQueue = new Vector<JobInProgress>();
+  JobInitThread initJobs = new JobInitThread();
+
   /**
    * Start the BSPMaster process, listen on the indicated hostname/port
    */
@@ -190,6 +195,43 @@ public class BSPMaster implements JobSub
     return activeGrooms;
   }
 
+  /////////////////////////////////////////////////////////////////
+  //  Used to init new jobs that have just been created
+  /////////////////////////////////////////////////////////////////
+  class JobInitThread implements Runnable {
+    private volatile boolean shouldRun = true;
+    
+      public JobInitThread() {
+      }
+      public void run() {
+          while (shouldRun) {
+              JobInProgress job = null;
+              synchronized (jobInitQueue) {
+                  if (jobInitQueue.size() > 0) {
+                      job = (JobInProgress) jobInitQueue.elementAt(0);
+                      jobInitQueue.remove(job);
+                  } else {
+                      try {
+                          jobInitQueue.wait(JOBINIT_SLEEP_INTERVAL);
+                      } catch (InterruptedException iex) {
+                      }
+                  }
+              }
+              try {
+                  if (job != null) {
+                      job.initTasks();
+                  }
+              } catch (Exception e) {
+                  LOG.warn("job init failed: " + e);
+                  job.kill();
+              }
+          }
+      }
+      public void stopIniter() {
+          shouldRun = false;
+      }
+  }
+  
   // /////////////////////////////////////////////////////////////
   // BSPMaster methods
   // /////////////////////////////////////////////////////////////
@@ -259,6 +301,9 @@ public class BSPMaster implements JobSub
   }
 
   public void offerService() throws InterruptedException, IOException {
+    new Thread(this.initJobs).start();
+    LOG.info("Starting jobInitThread");
+    
     this.interTrackerServer.start();
 
     synchronized (this) {
@@ -271,57 +316,6 @@ public class BSPMaster implements JobSub
   }
 
   // //////////////////////////////////////////////////
-  // GroomServerManager
-  // //////////////////////////////////////////////////
-  @Override
-  public void addJobInProgressListener(JobInProgressListener listener) {
-    // jobInProgressListeners.add(listener);
-  }
-
-  @Override
-  public void removeJobInProgressListener(JobInProgressListener listener) {
-    // jobInProgressListeners.remove(listener);
-  }
-
-  @Override
-  public void failJob(JobInProgress job) {
-    // TODO Auto-generated method stub
-  }
-
-  @Override
-  public JobInProgress getJob(BSPJobID jobid) {
-    return jobs.get(jobid);
-  }
-
-  @Override
-  public int getNextHeartbeatInterval() {
-    // TODO Auto-generated method stub
-    return 0;
-  }
-
-  @Override
-  public int getNumberOfUniqueHosts() {
-    // TODO Auto-generated method stub
-    return 1;
-  }
-
-  @Override
-  public Collection<GroomServerStatus> grooms() {
-    return groomServers.values();
-  }
-
-  @Override
-  public void initJob(JobInProgress job) {
-    if (null == job) {
-      LOG.info("Init on null job is not valid");
-      return;
-    }
-
-    // JobStatus prevStatus = (JobStatus)job.getStatus().clone();
-    LOG.info("Initializing " + job.getJobID());
-  }
-
-  // //////////////////////////////////////////////////
   // InterTrackerProtocol
   // //////////////////////////////////////////////////
   @Override
@@ -451,7 +445,6 @@ public class BSPMaster implements JobSub
 
     // taskid --> TIP
     taskIdToTaskInProgressMap.remove(taskid);
-
     LOG.debug("Removing task '" + taskid + "'");
   }
 
@@ -571,17 +564,12 @@ public class BSPMaster implements JobSub
   private synchronized JobStatus addJob(BSPJobID jodId, JobInProgress job) {
     totalSubmissions++;
     synchronized (jobs) {
-      jobs.put(job.getProfile().getJobID(), job);
-      taskScheduler.addJob(job);
-    }
-    
-    
-    // TODO Later, we should use the JobInProgressListener -- edwardyoon
-    try {
-      job.initTasks();
-    } catch (IOException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
+      synchronized (jobInitQueue) {
+        jobs.put(job.getProfile().getJobID(), job);
+        taskScheduler.addJob(job);
+        jobInitQueue.add(job);
+        jobInitQueue.notifyAll();
+      }
     }
     
     return job.getStatus();

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java?rev=1041693&r1=1041692&r2=1041693&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java Fri Dec  3 04:58:51
2010
@@ -66,4 +66,9 @@ public class BSPTaskRunner extends Threa
     }
   }
 
+  public void kill() {
+    // TODO Auto-generated method stub
+    LOG.debug(">>>> Kill Task Runner");
+  }
+
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1041693&r1=1041692&r2=1041693&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Fri Dec  3 04:58:51
2010
@@ -516,7 +516,7 @@ public class GroomServer implements Runn
   class TaskInProgress {
     Task task;
     BSPJob jobConf;
-    private BSPTaskRunner runner;
+    BSPTaskRunner runner;
     volatile boolean done = false;
     volatile boolean wasKilled = false;
     private TaskStatus taskStatus;
@@ -548,8 +548,7 @@ public class GroomServer implements Runn
         }
 
         if (bspPeer.getLocalQueueSize() == 0
-            && bspPeer.getOutgoingQueueSize() == 0
-            && !runner.isAlive()) {
+            && bspPeer.getOutgoingQueueSize() == 0 && !runner.isAlive())
{
           taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
           acceptNewTasks = true;
           break;
@@ -559,6 +558,15 @@ public class GroomServer implements Runn
     }
 
     /**
+     * This task has run on too long, and should be killed.
+     */
+    public synchronized void killAndCleanup(boolean wasFailure)
+        throws IOException {
+      // TODO 
+      runner.kill();
+    }
+
+    /**
      */
     public Task getTask() {
       return task;

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerManager.java?rev=1041693&r1=1041692&r2=1041693&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerManager.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerManager.java Fri Dec  3 04:58:51
2010
@@ -17,8 +17,6 @@
  */
 package org.apache.hama.bsp;
 
-import java.io.IOException;
-import java.util.Collection;
 
 /**
  * Manages information about the {@link GroomServer}s running on a cluster.
@@ -28,72 +26,10 @@ import java.util.Collection;
 interface GroomServerManager {
 
   /**
-   * @return A collection of the {@link GroomServerStatus} for the grooms
-   * being managed.
-   */
-  public Collection<GroomServerStatus> grooms();
-  
-  /**
-   * @return The number of unique hosts running grooms.
-   */
-  public int getNumberOfUniqueHosts();
-  
-  /**
    * Get the current status of the cluster
    * @param detailed if true then report groom names as well
    * @return summary of the state of the cluster
    */
   public ClusterStatus getClusterStatus(boolean detailed);
-
-  /**
-   * Registers a {@link JobInProgressListener} for updates from this
-   * {@link GroomServerManager}.
-   * @param jobInProgressListener the {@link JobInProgressListener} to add
-   */
-  public void addJobInProgressListener(JobInProgressListener listener);
-
-  /**
-   * Unregisters a {@link JobInProgressListener} from this
-   * {@link GroomServerManager}.
-   * @param jobInProgressListener the {@link JobInProgressListener} to remove
-   */
-  public void removeJobInProgressListener(JobInProgressListener listener);
-
-  /**
-   * Return the current heartbeat interval that's used by {@link GroomServer}s.
-   *
-   * @return the heartbeat interval used by {@link GroomServer}s
-   */
-  public int getNextHeartbeatInterval();
-
-  /**
-   * Kill the job identified by jobid
-   * 
-   * @param jobid
-   * @throws IOException
-   */
-  public void killJob(BSPJobID jobid)
-      throws IOException;
-
-  /**
-   * Obtain the job object identified by jobid
-   * 
-   * @param jobid
-   * @return jobInProgress object
-   */
-  public JobInProgress getJob(BSPJobID jobid);
-  
-  /**
-   * Initialize the Job
-   * 
-   * @param job JobInProgress object
-   */
-  public void initJob(JobInProgress job);
   
-  /**
-   * Fail a job.
-   * 
-   * @param job JobInProgress object
-   */
-  public void failJob(JobInProgress job);
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java?rev=1041693&r1=1041692&r2=1041693&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java Fri Dec  3 04:58:51
2010
@@ -18,8 +18,6 @@
 package org.apache.hama.bsp;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -161,30 +159,30 @@ class JobInProgress {
 
     // Update job status
     this.status = new JobStatus(this.status.getJobID(), 1.0f, 1.0f,
-       JobStatus.RUNNING);
+        JobStatus.RUNNING);
 
     tasksInited = true;
     LOG.debug("Job is initialized.");
   }
 
   public synchronized Task obtainNewTask(GroomServerStatus status,
-      int clusterSize, int numUniqueHosts) {
+      int clusterSize) {
     this.clusterSize = clusterSize;
-    
+
     if (this.status.getRunState() != JobStatus.RUNNING) {
       LOG.info("Cannot create task split for " + profile.getJobID());
       return null;
     }
-    
+
     Task result = null;
     try {
       for (int i = 0; i < tasks.length; i++) {
-        if(!tasks[i].isRunning()) {
+        if (!tasks[i].isRunning()) {
           result = tasks[i].getTaskToRun(status);
           break;
         }
       }
-      
+
     } catch (IOException e) {
       e.printStackTrace();
     }
@@ -230,8 +228,22 @@ class JobInProgress {
     }
   }
 
-  public void kill() {
-    // TODO Auto-generated method stub
+  public synchronized void kill() {
+    LOG.debug(">> JobInProgress.kill() step.");
+    if (status.getRunState() != JobStatus.FAILED) {
+      this.status = new JobStatus(status.getJobID(), 1.0f, 1.0f, 1.0f,
+          JobStatus.FAILED);
+      this.finishTime = System.currentTimeMillis();
+
+      //
+      // kill all TIPs.
+      //
+      for (int i = 0; i < tasks.length; i++) {
+        tasks[i].kill();
+      }
+
+      garbageCollect();
+    }
 
   }
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=1041693&r1=1041692&r2=1041693&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java Fri Dec  3
04:58:51 2010
@@ -56,7 +56,7 @@ class SimpleTaskScheduler extends TaskSc
    * GroomServerStatus)
    */
   @Override
-  public List<Task> assignTasks(GroomServerStatus groomStatus)
+  public synchronized List<Task> assignTasks(GroomServerStatus groomStatus)
       throws IOException {
     ClusterStatus clusterStatus = groomServerManager.getClusterStatus(false);
 
@@ -70,7 +70,6 @@ class SimpleTaskScheduler extends TaskSc
     // Assigned tasks
     List<Task> assignedTasks = new ArrayList<Task>();
 
-    // Task task = null;
     if (groomRunningTasks == 0) {
       // TODO - Each time a job is submitted in BSPMaster, add a JobInProgress
       // instance to the scheduler.
@@ -81,9 +80,7 @@ class SimpleTaskScheduler extends TaskSc
           }
 
           Task t = null;
-
-          t = job.obtainNewTask(groomStatus, numGroomServers,
-              groomServerManager.getNumberOfUniqueHosts());
+          t = job.obtainNewTask(groomStatus, numGroomServers);
 
           if (t != null) {
             assignedTasks.add(t);

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java?rev=1041693&r1=1041692&r2=1041693&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java Fri Dec  3 04:58:51
2010
@@ -37,6 +37,7 @@ class TaskInProgress {
   // Constants
   static final int MAX_TASK_EXECS = 1;
   int maxTaskAttempts = 4;
+  private boolean failed = false;
 
   // Job Meta
   private String jobFile = null;
@@ -80,7 +81,7 @@ class TaskInProgress {
     this.job = job;
     this.conf = conf;
     this.partition = partition;
-    
+
     this.id = new TaskID(jobId, true, partition);
   }
 
@@ -88,27 +89,27 @@ class TaskInProgress {
    * Return a Task that can be sent to a GroomServer for execution.
    */
   public Task getTaskToRun(GroomServerStatus status) throws IOException {
-      Task t = null;
-      
-      // TODO use the TaskID, instead of String. 
-      String taskid = null;
-      if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts)) {
-        taskid = new String("task_" + status.getGroomName() + "_" + nextTaskId);
-        ++nextTaskId;
-      } else {
-        LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + maxTaskAttempts) + 
-                " attempts for the tip '" + getTIPId() + "'");
-        return null;
-      }
-
-      t = new BSPTask(jobId, jobFile, taskid, partition);
-      activeTasks.put(taskid, status.getGroomName());
-
-      // Ask JobTracker to note that the task exists
-      bspMaster.createTaskEntry(taskid, status.getGroomName(), this);
-      return t;
+    Task t = null;
+
+    // TODO use the TaskID, instead of String.
+    String taskid = null;
+    if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts)) {
+      taskid = new String("task_" + status.getGroomName() + "_" + nextTaskId);
+      ++nextTaskId;
+    } else {
+      LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + maxTaskAttempts)
+          + " attempts for the tip '" + getTIPId() + "'");
+      return null;
+    }
+
+    t = new BSPTask(jobId, jobFile, taskid, partition);
+    activeTasks.put(taskid, status.getGroomName());
+
+    // Ask JobTracker to note that the task exists
+    bspMaster.createTaskEntry(taskid, status.getGroomName(), this);
+    return t;
   }
-  
+
   // //////////////////////////////////
   // Accessors
   // //////////////////////////////////
@@ -133,7 +134,7 @@ class TaskInProgress {
   public TreeMap<String, String> getTasks() {
     return activeTasks;
   }
-  
+
   /**
    * Is the Task associated with taskid is the first attempt of the tip?
    * 
@@ -163,16 +164,15 @@ class TaskInProgress {
   }
 
   private TreeSet<String> tasksReportedClosed = new TreeSet<String>();
-  
+
   public boolean shouldCloseForClosedJob(String taskid) {
     TaskStatus ts = (TaskStatus) taskStatuses.get(taskid);
-    if ((ts != null) &&
-        (! tasksReportedClosed.contains(taskid)) &&
-        (job.getStatus().getRunState() != JobStatus.RUNNING)) {
-        tasksReportedClosed.add(taskid);
-        return true;
-    }  else {
-        return false;
+    if ((ts != null) && (!tasksReportedClosed.contains(taskid))
+        && (job.getStatus().getRunState() != JobStatus.RUNNING)) {
+      tasksReportedClosed.add(taskid);
+      return true;
+    } else {
+      return false;
     }
   }
 
@@ -183,8 +183,8 @@ class TaskInProgress {
     activeTasks.remove(taskid);
 
     //
-    // Now that the TIP is complete, the other speculative 
-    // subtasks will be closed when the owning tasktracker 
+    // Now that the TIP is complete, the other speculative
+    // subtasks will be closed when the owning tasktracker
     // reports in and calls shouldClose() on this object.
     //
 
@@ -194,8 +194,18 @@ class TaskInProgress {
   public void updateStatus(TaskStatus status) {
     taskStatuses.put(status.getTaskId(), status);
   }
-  
+
   public TaskStatus getTaskStatus(String taskId) {
     return this.taskStatuses.get(taskId);
   }
+
+  public void kill() {
+    LOG.debug(">> TaskInProgress.kill() step.");
+    this.failed = true;
+  }
+
+  public boolean isFailed() {
+    return failed;
+  }
+
 }



Mime
View raw message