incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1076130 - in /incubator/hama/trunk: ./ src/java/org/apache/hama/bsp/ src/java/org/apache/hama/ipc/
Date Wed, 02 Mar 2011 07:46:34 GMT
Author: edwardyoon
Date: Wed Mar  2 07:46:33 2011
New Revision: 1076130

URL: http://svn.apache.org/viewvc?rev=1076130&view=rev
Log:
Implement the kill method for JobInProgress

Removed:
    incubator/hama/trunk/src/java/org/apache/hama/bsp/HeartbeatResponse.java
    incubator/hama/trunk/src/java/org/apache/hama/ipc/InterServerProtocol.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRPCProtocolVersion.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/CommitTaskAction.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/Directive.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerAction.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/JobProfile.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/KillJobAction.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/KillTaskAction.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/ReinitGroomAction.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLog.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLogAppender.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1076130&r1=1076129&r2=1076130&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Wed Mar  2 07:46:33 2011
@@ -51,6 +51,7 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
  
+    HAMA-298: Implement the kill method for JobInProgress (edwardyoon)
     HAMA-353: Add "random communication benchmark" tool (edwardyoon)
     HAMA-351: Improvement of lack of info- about the output of examples (edwardyoon)
     HAMA-348: Remove hard-coded javaOpts (edwardyoon)

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java?rev=1076130&r1=1076129&r2=1076130&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java Wed Mar  2 07:46:33
2011
@@ -363,7 +363,7 @@ public class BSPJobClient extends Config
     LOG.info("Running job: " + info.getID());
 
     while (!job.isComplete()) {
-      Thread.sleep(1000);
+      Thread.sleep(3000);
       long step = job.progress();
       String report = "Current supersteps number: " + step;
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java?rev=1076130&r1=1076129&r2=1076130&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java Wed Mar  2 07:46:33 2011
@@ -259,6 +259,9 @@ public class BSPMaster implements JobSub
             jip.completedTask(tip, ts);
           } else if (ts.getRunState() == TaskStatus.State.RUNNING) {
             // do nothing
+          } else if (ts.getRunState() == TaskStatus.State.FAILED) {
+            jip.status.setRunState(JobStatus.FAILED);
+            jip.failedTask(tip, ts);
           }
           
           if (jip.getStatus().getRunState() == JobStatus.SUCCEEDED) {
@@ -272,7 +275,16 @@ public class BSPMaster implements JobSub
 
           } else if (jip.getStatus().getRunState() == JobStatus.RUNNING) {
             jip.getStatus().setprogress(ts.getSuperstepCount());
+          } else if (jip.getStatus().getRunState() == JobStatus.KILLED) {
+            
+            WorkerProtocol worker = findGroomServer(ustus);
+            Directive d1 = new Directive(currentGroomServerPeers(),
+                new GroomServerAction[] { new KillTaskAction(ts.getTaskId()) });
+            
+            worker.dispatch(d1);
+            
           }
+          
         }
       } else {
         throw new RuntimeException("BSPMaster contains GroomServerSatus, "

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=1076130&r1=1076129&r2=1076130&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Wed Mar  2 07:46:33 2011
@@ -44,7 +44,7 @@ import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.data.Stat;
 
 /**
- *  
+ * This class represents a BSP peer. 
  */
 public class BSPPeer implements Watcher, BSPPeerInterface {
   public static final Log LOG = LogFactory.getLog(BSPPeer.class);
@@ -59,12 +59,9 @@ public class BSPPeer implements Watcher,
   private final String bspRoot;
   private final String zookeeperAddr;
 
-  private final Map<InetSocketAddress, BSPPeerInterface> peers = 
-    new ConcurrentHashMap<InetSocketAddress, BSPPeerInterface>();
-  private final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues
= 
-    new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
-  private final ConcurrentLinkedQueue<BSPMessage> localQueue = 
-    new ConcurrentLinkedQueue<BSPMessage>();
+  private final Map<InetSocketAddress, BSPPeerInterface> peers = new ConcurrentHashMap<InetSocketAddress,
BSPPeerInterface>();
+  private final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues
= new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
+  private final ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<BSPMessage>();
   private SortedSet<String> allPeerNames = new TreeSet<String>();
   private InetSocketAddress peerAddress;
   private TaskStatus currentTaskStatus;
@@ -84,8 +81,8 @@ public class BSPPeer implements Watcher,
         + ":"
         + conf.getInt(Constants.ZOOKEPER_CLIENT_PORT,
             Constants.DEFAULT_ZOOKEPER_CLIENT_PORT);
-    // TODO: may require to dynamic reflect the underlying 
-    //       network e.g. ip address, port.
+    // TODO: may require to dynamic reflect the underlying
+    // network e.g. ip address, port.
     peerAddress = new InetSocketAddress(bindAddress, bindPort);
 
     reinitialize();
@@ -94,10 +91,11 @@ public class BSPPeer implements Watcher,
   public void reinitialize() {
     try {
       LOG.debug("reinitialize(): " + getPeerName());
-      server = RPC.getServer(this, peerAddress.getHostName(), peerAddress
-          .getPort(), conf);
+      server = RPC.getServer(this, peerAddress.getHostName(),
+          peerAddress.getPort(), conf);
       server.start();
-      LOG.info(" BSPPeer address:"+peerAddress.getHostName()+" port:"+peerAddress.getPort());
+      LOG.info(" BSPPeer address:" + peerAddress.getHostName() + " port:"
+          + peerAddress.getPort());
     } catch (IOException e) {
       e.printStackTrace();
     }
@@ -143,7 +141,8 @@ public class BSPPeer implements Watcher,
   @Override
   public void send(String peerName, BSPMessage msg) throws IOException {
     LOG.debug("Send bytes (" + msg.getData().toString() + ") to " + peerName);
-    ConcurrentLinkedQueue<BSPMessage> queue = outgoingQueues.get(getAddress(peerName));
+    ConcurrentLinkedQueue<BSPMessage> queue = outgoingQueues
+        .get(getAddress(peerName));
     if (queue == null) {
       queue = new ConcurrentLinkedQueue<BSPMessage>();
     }
@@ -184,7 +183,7 @@ public class BSPPeer implements Watcher,
 
     waitForSync();
     Thread.sleep(100);
-    
+
     // Clear outgoing queues.
     clearOutgoingQueues();
 
@@ -241,7 +240,7 @@ public class BSPPeer implements Watcher,
   protected boolean leaveBarrier() throws KeeperException, InterruptedException {
     zk.delete(bspRoot + "/" + getPeerName(), 0);
     zk.delete(bspRoot + "/" + getPeerName() + "-data", 0);
-    
+
     while (true) {
       synchronized (mutex) {
         List<String> list = zk.getChildren(bspRoot, true);
@@ -266,7 +265,7 @@ public class BSPPeer implements Watcher,
     this.localQueue.clear();
     this.outgoingQueues.clear();
   }
-  
+
   @Override
   public void close() throws IOException {
     server.stop();
@@ -310,8 +309,8 @@ public class BSPPeer implements Watcher,
 
   private InetSocketAddress getAddress(String peerName) {
     String[] peerAddrParts = peerName.split(":");
-    return new InetSocketAddress(peerAddrParts[0], Integer
-        .parseInt(peerAddrParts[1]));
+    return new InetSocketAddress(peerAddrParts[0],
+        Integer.parseInt(peerAddrParts[1]));
   }
 
   @Override
@@ -381,7 +380,7 @@ public class BSPPeer implements Watcher,
   public void clearLocalQueue() {
     this.localQueue.clear();
   }
-  
+
   /**
    * Clears outgoing queues
    */

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRPCProtocolVersion.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRPCProtocolVersion.java?rev=1076130&r1=1076129&r2=1076130&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRPCProtocolVersion.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRPCProtocolVersion.java Wed Mar 
2 07:46:33 2011
@@ -22,7 +22,7 @@ package org.apache.hama.bsp;
 import org.apache.hadoop.ipc.VersionedProtocol;
 
 /**
- * 
+ * RPC Protocol version
  */
 public interface BSPRPCProtocolVersion extends VersionedProtocol {
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/CommitTaskAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/CommitTaskAction.java?rev=1076130&r1=1076129&r2=1076130&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/CommitTaskAction.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/CommitTaskAction.java Wed Mar  2 07:46:33
2011
@@ -25,7 +25,6 @@ import java.io.IOException;
  * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} 
  * to the {@link org.apache.hama.bsp.GroomServer} to commit the output
  * of the task.
- * 
  */
 class CommitTaskAction extends GroomServerAction {
   private TaskAttemptID taskId;

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/Directive.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Directive.java?rev=1076130&r1=1076129&r2=1076130&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/Directive.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Directive.java Wed Mar  2 07:46:33 2011
@@ -33,7 +33,6 @@ import org.apache.hadoop.io.WritableUtil
 /**
  * A generic directive from the {@link org.apache.hama.bsp.BSPMaster} to the
  * {@link org.apache.hama.bsp.GroomServer} to take some 'action'.
- * 
  */
 public class Directive implements Writable {
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1076130&r1=1076129&r2=1076130&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Wed Mar  2 07:46:33
2011
@@ -49,9 +49,9 @@ import org.apache.hadoop.metrics.Metrics
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.DiskChecker;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.ipc.MasterProtocol;
@@ -141,8 +141,9 @@ public class GroomServer implements Runn
     }
 
     if (localHostname == null) {
-      this.localHostname = DNS.getDefaultHost(conf.get("bsp.dns.interface",
-          "default"), conf.get("bsp.dns.nameserver", "default"));
+      this.localHostname = DNS.getDefaultHost(
+          conf.get("bsp.dns.interface", "default"),
+          conf.get("bsp.dns.nameserver", "default"));
     }
     // check local disk
     checkLocalDirs(conf.getStrings("bsp.local.dir"));
@@ -238,12 +239,17 @@ public class GroomServer implements Runn
         if (action instanceof LaunchTaskAction) {
           startNewTask((LaunchTaskAction) action);
         } else {
-          try {
-            tasksToCleanup.put(action);
-          } catch (InterruptedException e) {
-            LOG.error("Fail to move action to cleanup list.");
-            e.printStackTrace();
+
+          // TODO Use the cleanup thread
+          // tasksToCleanup.put(action);
+          
+          KillTaskAction killAction = (KillTaskAction) action;
+          if (tasks.containsKey(killAction.getTaskID())) {
+            TaskInProgress tip = tasks.get(killAction.getTaskID());
+            tip.taskStatus.setRunState(TaskStatus.State.FAILED);
+            tip.killAndCleanup(true);
           }
+
         }
       }
     }
@@ -306,6 +312,25 @@ public class GroomServer implements Runn
     while (running && !shuttingDown) {
       try {
         Thread.sleep(REPORT_INTERVAL);
+
+        // Reports to a BSPMaster
+        for (Map.Entry<TaskAttemptID, TaskInProgress> e : runningTasks
+            .entrySet()) {
+          TaskInProgress tip = e.getValue();
+          TaskStatus taskStatus = tip.getStatus();
+          taskStatus.setProgress(bspPeer.getSuperstepCount());
+
+          if (bspPeer.getLocalQueueSize() == 0
+              && bspPeer.getOutgoingQueueSize() == 0 && !tip.runner.isAlive())
{
+            if (taskStatus.getRunState() != TaskStatus.State.FAILED) {
+              taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+            }
+            taskStatus.setPhase(TaskStatus.Phase.CLEANUP);
+          }
+
+          doReport(taskStatus);
+        }
+
       } catch (InterruptedException ie) {
       }
 
@@ -362,6 +387,44 @@ public class GroomServer implements Runn
     }
   }
 
+  /**
+   * Update and report refresh status back to BSPMaster.
+   */
+  public void doReport(TaskStatus taskStatus) {
+    GroomServerStatus gss = new GroomServerStatus(groomServerName,
+        bspPeer.getPeerName(), updateTaskStatus(taskStatus), failures,
+        maxCurrentTasks, rpcServer);
+    try {
+      boolean ret = masterClient.report(new Directive(gss));
+      if (!ret) {
+        LOG.warn("Fail to renew BSPMaster's GroomServerStatus. "
+            + " groom name: " + gss.getGroomName() + " peer name:"
+            + gss.getPeerName() + " rpc server:" + rpcServer);
+      }
+    } catch (IOException ioe) {
+      LOG.error("Fail to communicate with BSPMaster for reporting.", ioe);
+    }
+  }
+
+  public List<TaskStatus> updateTaskStatus(TaskStatus taskStatus) {
+    List<TaskStatus> tlist = new ArrayList<TaskStatus>();
+    synchronized (runningTasks) {
+
+      if (taskStatus.getRunState() == TaskStatus.State.SUCCEEDED
+          || taskStatus.getRunState() == TaskStatus.State.FAILED) {
+        synchronized (finishedTasks) {
+          TaskInProgress tip = runningTasks.remove(taskStatus.getTaskId());
+          tlist.add((TaskStatus) taskStatus.clone());
+          finishedTasks.put(taskStatus.getTaskId(), tip);
+        }
+      } else if (taskStatus.getRunState() == TaskStatus.State.RUNNING) {
+        tlist.add((TaskStatus) taskStatus.clone());
+      }
+
+    }
+    return tlist;
+  }
+
   private void localizeJob(TaskInProgress tip) throws IOException {
     Task task = tip.getTask();
     conf.addResource(task.getJobFile());
@@ -609,63 +672,6 @@ public class GroomServer implements Runn
       bspPeer.setCurrentTaskStatus(taskStatus);
       this.runner = task.createRunner(GroomServer.this);
       this.runner.start();
-
-      // Check state of a Task
-      while (true) {
-        try {
-          taskStatus.setProgress(bspPeer.getSuperstepCount());
-          doReport(this.taskStatus);
-          Thread.sleep(REPORT_INTERVAL);
-        } catch (InterruptedException e) {
-          e.printStackTrace();
-        }
-
-        if (bspPeer.getLocalQueueSize() == 0
-            && bspPeer.getOutgoingQueueSize() == 0 && !runner.isAlive())
{
-          taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
-          taskStatus.setPhase(TaskStatus.Phase.CLEANUP);
-          doReport(this.taskStatus);
-          break;
-        }
-      }
-
-    }
-
-    /**
-     * Update and report refresh status back to BSPMaster.
-     */
-    private void doReport(TaskStatus taskStatus) {
-      GroomServerStatus gss = new GroomServerStatus(groomServerName, bspPeer
-          .getPeerName(), updateTaskStatus(taskStatus), failures,
-          maxCurrentTasks, rpcServer);
-      try {
-        boolean ret = masterClient.report(new Directive(gss));
-        if (!ret) {
-          LOG.warn("Fail to renew BSPMaster's GroomServerStatus. "
-              + " groom name: " + gss.getGroomName() + " peer name:"
-              + gss.getPeerName() + " rpc server:" + rpcServer);
-        }
-      } catch (IOException ioe) {
-        LOG.error("Fail to communicate with BSPMaster for reporting.", ioe);
-      }
-    }
-
-    private List<TaskStatus> updateTaskStatus(TaskStatus taskStatus) {
-      List<TaskStatus> tlist = new ArrayList<TaskStatus>();
-      synchronized (runningTasks) {
-
-        if (taskStatus.getRunState() == TaskStatus.State.SUCCEEDED) {
-          synchronized (finishedTasks) {
-            TaskInProgress tip = runningTasks.remove(taskStatus.getTaskId());
-            tlist.add((TaskStatus) taskStatus.clone());
-            finishedTasks.put(taskStatus.getTaskId(), tip);
-          }
-        } else if (taskStatus.getRunState() == TaskStatus.State.RUNNING) {
-          tlist.add((TaskStatus) taskStatus.clone());
-        }
-
-      }
-      return tlist;
     }
 
     /**
@@ -673,7 +679,6 @@ public class GroomServer implements Runn
      */
     public synchronized void killAndCleanup(boolean wasFailure)
         throws IOException {
-      // TODO
       runner.kill();
     }
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerAction.java?rev=1076130&r1=1076129&r2=1076130&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerAction.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerAction.java Wed Mar  2 07:46:33
2011
@@ -27,7 +27,6 @@ import org.apache.hadoop.io.WritableUtil
 /**
  * A generic directive from the {@link org.apache.hama.bsp.BSPMaster}
  * to the {@link org.apache.hama.bsp.GroomServer} to take some 'action'. 
- * 
  */
 abstract class GroomServerAction implements Writable {
   

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java?rev=1076130&r1=1076129&r2=1076130&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java Wed Mar  2 07:46:33
2011
@@ -243,6 +243,36 @@ class JobInProgress {
     }
   }
 
+  public void failedTask(TaskInProgress tip, TaskStatus status) {
+    TaskAttemptID taskid = status.getTaskId();
+    updateTaskStatus(tip, status);
+    LOG.info("Taskid '" + taskid + "' has failed.");
+    tip.terminated(taskid);
+
+    //
+    // If all tasks are complete, then the job is done!
+    //
+
+    boolean allDone = true;
+    for (TaskInProgress taskInProgress : tasks) {
+      if (!taskInProgress.isFailed()) {
+        allDone = false;
+        break;
+      }
+    }
+
+    if (allDone) {
+      this.status = new JobStatus(this.status.getJobID(), this.profile
+          .getUser(), superstepCounter, superstepCounter, superstepCounter, JobStatus.FAILED,
superstepCounter);
+      this.finishTime = System.currentTimeMillis();
+      this.status.setFinishTime(this.finishTime);
+      
+      LOG.debug("Job failed.");
+      
+      garbageCollect();
+    }
+  }
+  
   public synchronized void updateTaskStatus(TaskInProgress tip,
       TaskStatus taskStatus) {
     tip.updateStatus(taskStatus); // update tip
@@ -255,10 +285,9 @@ class JobInProgress {
   }
 
   public synchronized void kill() {
-    LOG.debug(">> JobInProgress.kill() step.");
-    if (status.getRunState() != JobStatus.FAILED) {
+    if (status.getRunState() != JobStatus.KILLED) {
       this.status = new JobStatus(status.getJobID(), this.profile.getUser(),
-          0L, 0L, 0L, JobStatus.FAILED);
+          0L, 0L, 0L, JobStatus.KILLED);
       this.finishTime = System.currentTimeMillis();
       this.status.setFinishTime(this.finishTime);
       //

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobProfile.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobProfile.java?rev=1076130&r1=1076129&r2=1076130&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobProfile.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobProfile.java Wed Mar  2 07:46:33
2011
@@ -27,7 +27,7 @@ import org.apache.hadoop.io.WritableFact
 import org.apache.hadoop.io.WritableFactory;
 
 /**
- * A JobProfile tracks job's status
+ * A JobProfile tracks job's status.
  */
 public class JobProfile implements Writable {
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/KillJobAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/KillJobAction.java?rev=1076130&r1=1076129&r2=1076130&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/KillJobAction.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/KillJobAction.java Wed Mar  2 07:46:33
2011
@@ -27,7 +27,6 @@ import org.apache.hadoop.io.Text;
  * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the
  * {@link org.apache.hama.bsp.GroomServer} to kill the task of a job and cleanup
  * resources.
- * 
  */
 class KillJobAction extends GroomServerAction {
   String jobId;

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/KillTaskAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/KillTaskAction.java?rev=1076130&r1=1076129&r2=1076130&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/KillTaskAction.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/KillTaskAction.java Wed Mar  2 07:46:33
2011
@@ -24,7 +24,6 @@ import java.io.IOException;
 /**
  * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} 
  * to the {@link org.apache.hama.bsp.GroomServer} to kill a task.
- * 
  */
 class KillTaskAction extends GroomServerAction {
   TaskAttemptID taskId;

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java?rev=1076130&r1=1076129&r2=1076130&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java Wed Mar  2 07:46:33
2011
@@ -24,7 +24,6 @@ import java.io.IOException;
 /**
  * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the
  * {@link org.apache.hama.bsp.GroomServer} to launch a new task.
- * 
  */
 class LaunchTaskAction extends GroomServerAction {
   private Task task;

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/ReinitGroomAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/ReinitGroomAction.java?rev=1076130&r1=1076129&r2=1076130&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/ReinitGroomAction.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/ReinitGroomAction.java Wed Mar  2 07:46:33
2011
@@ -24,7 +24,6 @@ import java.io.IOException;
 /**
  * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the
  * {@link org.apache.hama.bsp.GroomServer} to reinitialize itself.
- * 
  */
 class ReinitGroomAction extends GroomServerAction {
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java?rev=1076130&r1=1076129&r2=1076130&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java Wed Mar  2 07:46:33
2011
@@ -224,6 +224,14 @@ class TaskInProgress {
 
     this.completes++;
   }
+  
+  public void terminated(TaskAttemptID taskid) {
+    LOG.info("Task '" + taskid.getTaskID().toString() + "' has failed.");
+
+    TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
+    status.setRunState(TaskStatus.State.FAILED);
+    activeTasks.remove(taskid);
+  }
 
   private void setSuccessfulTaskid(TaskAttemptID taskid) {
     this.successfulTaskId = taskid;
@@ -242,7 +250,6 @@ class TaskInProgress {
   }
 
   public void kill() {
-    LOG.debug(">> TaskInProgress.kill() step.");
     this.failed = true;
   }
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLog.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLog.java?rev=1076130&r1=1076129&r2=1076130&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLog.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLog.java Wed Mar  2 07:46:33 2011
@@ -30,6 +30,9 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hama.HamaConfiguration;
 
+/**
+ * A simple logger to handle the task-specific user logs.
+ */
 public class TaskLog {
   private static final Log LOG = LogFactory.getLog(TaskLog.class.getName());
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLogAppender.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLogAppender.java?rev=1076130&r1=1076129&r2=1076130&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLogAppender.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskLogAppender.java Wed Mar  2 07:46:33
2011
@@ -23,6 +23,9 @@ import java.util.Queue;
 import org.apache.log4j.FileAppender;
 import org.apache.log4j.spi.LoggingEvent;
 
+/**
+ * A simple log4j-appender for the task child's BSP system logs.
+ */
 public class TaskLogAppender extends FileAppender {
   private String taskId; // taskId should be managed as String rather than
                          // TaskID object

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java?rev=1076130&r1=1076129&r2=1076130&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java Wed Mar  2 07:46:33
2011
@@ -30,7 +30,6 @@ import org.apache.hadoop.io.WritableUtil
 /**
  * Describes the current status of a task. This is not intended to be a
  * comprehensive piece of data.
- * 
  */
 class TaskStatus implements Writable, Cloneable {
   static final Log LOG = LogFactory.getLog(TaskStatus.class);



Mime
View raw message