hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From surajsme...@apache.org
Subject svn commit: r1369551 [1/3] - in /hama/branches/HAMA-505-branch: conf/ core/src/main/java/org/apache/hama/ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/ft/ core/src/main/java/org/apache/hama/bsp/message/ core/src/main/j...
Date Sun, 05 Aug 2012 11:07:49 GMT
Author: surajsmenon
Date: Sun Aug  5 11:07:48 2012
New Revision: 1369551

URL: http://svn.apache.org/viewvc?rev=1369551&view=rev
Log:
HAMA-557 HAMA-610 HAMA-611 HAMA-587 fixes committing to HAMA-505-branch

Added:
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/UpdatePeerAction.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/BSPFaultTolerantService.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/FaultTolerantMasterService.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/FaultTolerantPeerService.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/MessageEventListener.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/BSPResource.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/RawSplitResource.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/TaskAllocationStrategy.java
    hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestTaskAllocation.java
Modified:
    hama/branches/HAMA-505-branch/conf/hama-default.xml
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/Constants.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPTask.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/GroomServer.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobStatus.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/LaunchTaskAction.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/TaskStatus.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/MasterSyncClient.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncBSPMasterClient.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncClient.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
    hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
    hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java
    hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java

Modified: hama/branches/HAMA-505-branch/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/conf/hama-default.xml?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/conf/hama-default.xml (original)
+++ hama/branches/HAMA-505-branch/conf/hama-default.xml Sun Aug  5 11:07:48 2012
@@ -120,7 +120,12 @@
     <description>The maximum number of BSP tasks that will be run simultaneously 
     by a groom server.</description>
   </property>
-   <property>
+  <property>
+    <name>bsp.ft.enabled</name>
+    <value>false</value>
+    <description>Enable Fault Tolerance in BSP Task execution.</description>
+  </property>
+  <property>
     <name>bsp.checkpoint.enabled</name>
     <value>false</value>
     <description>Enable Hama to checkpoint the messages transferred among BSP tasks during the BSP synchronization period.</description>

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/Constants.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/Constants.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/Constants.java Sun Aug  5 11:07:48 2012
@@ -60,6 +60,24 @@ public interface Constants {
   public static final String UTF8_ENCODING = "UTF-8";
 
   public static final String MAX_TASKS_PER_GROOM = "bsp.tasks.maximum";
+  
+  public static final String MAX_TASK_ATTEMPTS = "bsp.tasks.max.attempts";
+
+  public static final int DEFAULT_MAX_TASK_ATTEMPTS = 2;
+
+  ////////////////////////////////////////
+  // Task scheduler related constants
+  // //////////////////////////////////////
+  
+  public static final String TASK_ALLOCATOR_CLASS = "bsp.taskalloc.class";
+  
+  // //////////////////////////////////////
+  // Fault tolerance related constants
+  // //////////////////////////////////////
+
+  public static final String FAULT_TOLERANCE_FLAG = "bsp.ft.enabled";
+  
+  public static final String FAULT_TOLERANCE_CLASS = "bsp.ft.class";
 
   // //////////////////////////////////////
   // Checkpointing related constants

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Sun Aug  5 11:07:48 2012
@@ -1025,7 +1025,7 @@ public class BSPJobClient extends Config
     return tokens;
   }
 
-  static class RawSplit implements Writable {
+  public static class RawSplit implements Writable {
     private String splitClass;
     private BytesWritable bytes = new BytesWritable();
     private String[] locations;

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPMaster.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPMaster.java Sun Aug  5 11:07:48 2012
@@ -28,12 +28,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -48,7 +48,6 @@ import org.apache.hadoop.security.Access
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.sync.BSPMasterSyncClient;
 import org.apache.hama.bsp.sync.MasterSyncClient;
 import org.apache.hama.bsp.sync.ZKSyncBSPMasterClient;
 import org.apache.hama.http.HttpServer;
@@ -59,18 +58,23 @@ import org.apache.hama.ipc.MasterProtoco
 import org.apache.hama.monitor.fd.FDProvider;
 import org.apache.hama.monitor.fd.Supervisor;
 import org.apache.hama.monitor.fd.UDPSupervisor;
-import org.apache.hama.zookeeper.QuorumPeer;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
 
 /**
  * BSPMaster is responsible to control all the groom servers and to manage bsp
- * jobs.
+ * jobs. It has the following responsibilities:
+ * <ol>
+ * <li> <b>Job submission</b>. BSPMaster is responsible for accepting new job
+ * requests and assigning the job to scheduler for scheduling BSP Tasks defined
+ * for the job.
+ * <li> <b>GroomServer monitoring</b> BSPMaster keeps track of all the groom 
+ * servers in the cluster. It is responsible for adding new grooms to the 
+ * cluster and keeping a tab on all the grooms and could blacklist a groom if 
+ * it get fails the availability requirement.
+ * <li> BSPMaster keeps track of all the task status for each job and handles
+ * the failure of job as requested by the jobs.  
+ * </ol>
  */
 public class BSPMaster implements JobSubmissionProtocol, MasterProtocol,
     GroomServerManager, Watcher, MonitorManager {
@@ -80,8 +84,6 @@ public class BSPMaster implements JobSub
   private static final int FS_ACCESS_RETRY_PERIOD = 10000;
 
   private HamaConfiguration conf;
-  ZooKeeper zk = null;
-  private String bspRoot = null;
   MasterSyncClient syncClient = null;
 
   /**
@@ -126,7 +128,7 @@ public class BSPMaster implements JobSub
 
   // Jobs' Meta Data
   private Integer nextJobId = Integer.valueOf(1);
-  // clients
+  private int totalSubmissions = 0; // how many jobs has been submitted by clients
   private int totalTasks = 0; // currnetly running tasks
   private int totalTaskCapacity; // max tasks that groom server can run
 
@@ -145,6 +147,13 @@ public class BSPMaster implements JobSub
 
   private final AtomicReference<Supervisor> supervisor = new AtomicReference<Supervisor>();
 
+  /**
+   * ReportGroomStatusHandler keeps track of the status reported by each 
+   * Groomservers on the task they are executing currently. Based on the 
+   * status reported, it is responsible for issuing task recovery requests, 
+   * updating the job progress and other book keeping on currently running
+   * jobs. 
+   */
   private class ReportGroomStatusHandler implements DirectiveHandler {
 
     @Override
@@ -181,8 +190,13 @@ public class BSPMaster implements JobSub
               jip.getStatus().setProgress(ts.getSuperstepCount());
               jip.getStatus().setSuperstepCount(ts.getSuperstepCount());
             } else if (ts.getRunState() == TaskStatus.State.FAILED) {
-              jip.status.setRunState(JobStatus.FAILED);
-              jip.failedTask(tip, ts);
+              if(jip.handleFailure(tip)){
+                recoverTask(jip);
+              }
+              else {
+                jip.status.setRunState(JobStatus.FAILED);
+                jip.failedTask(tip, ts);
+              }
             }
             if (jip.getStatus().getRunState() == JobStatus.SUCCEEDED) {
               for (JobInProgressListener listener : jobInProgressListeners) {
@@ -196,6 +210,7 @@ public class BSPMaster implements JobSub
               jip.getStatus().setProgress(ts.getSuperstepCount());
               jip.getStatus().setSuperstepCount(ts.getSuperstepCount());
             } else if (jip.getStatus().getRunState() == JobStatus.KILLED) {
+              
               GroomProtocol worker = findGroomServer(tmpStatus);
               Directive d1 = new DispatchTasksDirective(
                   new GroomServerAction[] { new KillTaskAction(ts.getTaskId()) });
@@ -448,11 +463,26 @@ public class BSPMaster implements JobSub
     return conf.getLocalPath("bsp.local.dir", pathString);
   }
 
+  /**
+   * Starts the BSP Master process.
+   * @param conf The Hama configuration.
+   * @return an instance of BSPMaster
+   * @throws IOException
+   * @throws InterruptedException
+   */
   public static BSPMaster startMaster(HamaConfiguration conf)
       throws IOException, InterruptedException {
     return startMaster(conf, generateNewIdentifier());
   }
 
+  /**
+   * Starts the BSP Master process
+   * @param conf The Hama configuration
+   * @param identifier Identifier for the job.
+   * @return
+   * @throws IOException
+   * @throws InterruptedException
+   */
   public static BSPMaster startMaster(HamaConfiguration conf, String identifier)
       throws IOException, InterruptedException {
     BSPMaster result = new BSPMaster(conf, identifier);
@@ -573,6 +603,11 @@ public class BSPMaster implements JobSub
 
     JobInProgress job = new JobInProgress(jobID, new Path(jobFile), this,
         this.conf);
+    ++totalSubmissions;
+    if(LOG.isDebugEnabled()){
+      LOG.debug("Submitting job number = " + totalSubmissions + 
+          " id = " + job.getJobID());
+    }
     return addJob(jobID, job);
   }
 
@@ -720,6 +755,21 @@ public class BSPMaster implements JobSub
     }
     return job.getStatus();
   }
+  
+  /**
+   * Recovers task in job. To be called when a particular task in a job has failed 
+   * and there is a need to schedule it on a machine.
+   */
+  private synchronized void recoverTask(JobInProgress job) {
+    ++totalSubmissions;
+    for (JobInProgressListener listener : jobInProgressListeners) {
+      try {
+        listener.recoverTaskInJob(job);
+      } catch (IOException ioe) {
+        LOG.error("Fail to alter Scheduler a job is added.", ioe);
+      }
+    }
+  }
 
   @Override
   public JobStatus[] jobsToComplete() throws IOException {
@@ -836,11 +886,14 @@ public class BSPMaster implements JobSub
     }
   }
 
+  /**
+   * Shuts down the BSP Process and does the necessary clean up.
+   */
   public void shutdown() {
     try {
-      this.zk.close();
-    } catch (InterruptedException e) {
-      e.printStackTrace();
+      this.syncClient.close();
+    } catch (IOException e) {
+      LOG.error("Error closing the sync client",e);
     }
     if (null != this.supervisor.get()) {
       this.supervisor.get().stop();

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Sun Aug  5 11:07:48 2012
@@ -19,30 +19,27 @@ package org.apache.hama.bsp;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.Constants;
 import org.apache.hama.bsp.Counters.Counter;
+import org.apache.hama.bsp.ft.AsyncRcvdMsgCheckpointImpl;
+import org.apache.hama.bsp.ft.BSPFaultTolerantService;
+import org.apache.hama.bsp.ft.FaultTolerantPeerService;
 import org.apache.hama.bsp.message.MessageManager;
 import org.apache.hama.bsp.message.MessageManagerFactory;
 import org.apache.hama.bsp.message.MessageQueue;
-import org.apache.hama.bsp.sync.BSPPeerSyncClient;
 import org.apache.hama.bsp.sync.PeerSyncClient;
-import org.apache.hama.bsp.sync.SyncClient;
-import org.apache.hama.bsp.sync.SyncEventListener;
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.bsp.sync.SyncServiceFactory;
 import org.apache.hama.ipc.BSPPeerProtocol;
@@ -57,10 +54,7 @@ public final class BSPPeerImpl<K1, V1, K
   private static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
 
   public static enum PeerCounter {
-    SUPERSTEP_SUM, SUPERSTEPS, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS,
-    IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED,
-    TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, COMPRESSED_BYTES_SENT,
-    COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS
+    SUPERSTEP_SUM, SUPERSTEPS, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS, IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED, TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, COMPRESSED_BYTES_SENT, COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS
   }
 
   private final Configuration conf;
@@ -78,10 +72,6 @@ public final class BSPPeerImpl<K1, V1, K
   private PeerSyncClient syncClient;
   private MessageManager<M> messenger;
 
-  // A checkpoint is initiated at the <checkPointInterval>th interval.
-  private int checkPointInterval;
-  private long lastCheckPointStep;
-
   // IO
   private int partition;
   private String splitClass;
@@ -96,6 +86,8 @@ public final class BSPPeerImpl<K1, V1, K
   private Counters counters;
   private Combiner<M> combiner;
 
+  private FaultTolerantPeerService<M> faultToleranceService;
+
   /**
    * Protected default constructor for LocalBSPRunner.
    */
@@ -127,6 +119,13 @@ public final class BSPPeerImpl<K1, V1, K
     this.counters = counters;
   }
 
+  public BSPPeerImpl(BSPJob job, Configuration conf, TaskAttemptID taskId,
+      BSPPeerProtocol umbilical, int partition, String splitClass,
+      BytesWritable split, Counters counters) throws Exception {
+    this(job, conf, taskId, umbilical, partition, splitClass, split, counters,
+        -1, TaskStatus.State.RUNNING);
+  }
+
   /**
    * BSPPeer Constructor.
    * 
@@ -140,7 +139,8 @@ public final class BSPPeerImpl<K1, V1, K
   @SuppressWarnings("unchecked")
   public BSPPeerImpl(BSPJob job, Configuration conf, TaskAttemptID taskId,
       BSPPeerProtocol umbilical, int partition, String splitClass,
-      BytesWritable split, Counters counters) throws Exception {
+      BytesWritable split, Counters counters, long superstep,
+      TaskStatus.State state) throws Exception {
     this.conf = conf;
     this.taskId = taskId;
     this.umbilical = umbilical;
@@ -153,28 +153,29 @@ public final class BSPPeerImpl<K1, V1, K
 
     this.fs = FileSystem.get(conf);
 
-    this.checkPointInterval = conf.getInt(Constants.CHECKPOINT_INTERVAL,
-        Constants.DEFAULT_CHECKPOINT_INTERVAL);
-    this.lastCheckPointStep = 0;
-
     String bindAddress = conf.get(Constants.PEER_HOST,
         Constants.DEFAULT_PEER_HOST);
     int bindPort = conf
         .getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
     peerAddress = new InetSocketAddress(bindAddress, bindPort);
-    initialize();
-    syncClient.register(taskId.getJobID(), taskId, peerAddress.getHostName(),
-        peerAddress.getPort());
-    // initial barrier syncing to get all the hosts to the same point, to get
-    // consistent peernames.
-    syncClient.enterBarrier(taskId.getJobID(), taskId, -1);
-    syncClient.leaveBarrier(taskId.getJobID(), taskId, -1);
-    setCurrentTaskStatus(new TaskStatus(taskId.getJobID(), taskId, 1.0f,
-        TaskStatus.State.RUNNING, "running", peerAddress.getHostName(),
-        TaskStatus.Phase.STARTING, counters));
 
-    messenger = MessageManagerFactory.getMessageManager(conf);
-    messenger.init(taskId, this, conf, peerAddress);
+    initializeIO();
+    initializeSyncService(superstep, state);
+
+    TaskStatus.Phase phase = TaskStatus.Phase.STARTING;
+    String stateString = "running";
+    if (state == TaskStatus.State.RECOVERING) {
+      phase = TaskStatus.Phase.RECOVERING;
+      stateString = "recovering";
+    }
+
+    setCurrentTaskStatus(new TaskStatus(taskId.getJobID(), taskId, 1.0f, state,
+        stateString, peerAddress.getHostName(), phase, counters));
+
+    initilizeMessaging();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Initialized Messaging service.");
+    }
 
     final String combinerName = conf.get("bsp.combiner.class");
     if (combinerName != null) {
@@ -182,12 +183,57 @@ public final class BSPPeerImpl<K1, V1, K
           conf.getClassByName(combinerName), conf);
     }
 
+    if (conf.getBoolean(Constants.FAULT_TOLERANCE_FLAG, false)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Fault tolerance enabled.");
+      }
+      if (superstep > 0)
+        conf.setInt("attempt.superstep", (int) superstep);
+      Class<?> ftClass = conf.getClass(Constants.FAULT_TOLERANCE_CLASS,
+          AsyncRcvdMsgCheckpointImpl.class, BSPFaultTolerantService.class);
+      if (ftClass != null) {
+        if (superstep > 0) {
+          counters.incrCounter(PeerCounter.SUPERSTEP_SUM, superstep);
+        }
+
+        this.faultToleranceService = ((BSPFaultTolerantService<M>) ReflectionUtils
+            .newInstance(ftClass, null)).constructPeerFaultTolerance(job, this,
+            syncClient, peerAddress, this.taskId, superstep, conf, messenger);
+        TaskStatus.State newState = this.faultToleranceService
+            .onPeerInitialized(state);
+
+        if (state == TaskStatus.State.RECOVERING) {
+          if (newState == TaskStatus.State.RUNNING) {
+            phase = TaskStatus.Phase.STARTING;
+            stateString = "running";
+            state = newState;
+          }
+
+          setCurrentTaskStatus(new TaskStatus(taskId.getJobID(), taskId, 1.0f,
+              state, stateString, peerAddress.getHostName(), phase, counters));
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("State after FT service initialization - "
+                + newState.toString());
+          }
+
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Initialized fault tolerance service");
+        }
+      }
+    }
+    doFirstSync(superstep);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.info(new StringBuffer("BSP Peer successfully initialized for ")
+          .append(this.taskId.toString()).append(" ").append(superstep)
+          .toString());
+    }
   }
 
   @SuppressWarnings("unchecked")
   public final void initialize() throws Exception {
-    syncClient = SyncServiceFactory.getPeerSyncClient(conf);
-    syncClient.init(conf, taskId.getJobID(), taskId);
 
     initInput();
 
@@ -239,60 +285,59 @@ public final class BSPPeerImpl<K1, V1, K
     }
   }
 
-  @Override
-  public final M getCurrentMessage() throws IOException {
-    return messenger.getCurrentMessage();
+  public final void initilizeMessaging() throws ClassNotFoundException {
+    messenger = MessageManagerFactory.getMessageManager(conf);
+    messenger.init(taskId, this, conf, peerAddress);
   }
 
-  @Override
-  public final void send(String peerName, M msg) throws IOException {
-    incrementCounter(PeerCounter.TOTAL_MESSAGES_SENT, 1L);
-    messenger.send(peerName, msg);
+  public final void initializeSyncService(long superstep, TaskStatus.State state)
+      throws Exception {
+
+    syncClient = SyncServiceFactory.getPeerSyncClient(conf);
+    syncClient.init(conf, taskId.getJobID(), taskId);
+    syncClient.register(taskId.getJobID(), taskId, peerAddress.getHostName(),
+        peerAddress.getPort());
   }
 
-  /*
-   * returns true if the peer would checkpoint in the next sync.
-   */
-  public final boolean isReadyToCheckpoint() {
+  private void doFirstSync(long superstep) throws SyncException {
+    if (superstep > 0)
+      --superstep;
+    syncClient.enterBarrier(taskId.getJobID(), taskId, superstep);
+    syncClient.leaveBarrier(taskId.getJobID(), taskId, superstep);
+  }
 
-    checkPointInterval = conf.getInt(Constants.CHECKPOINT_INTERVAL, 1);
-    if (LOG.isDebugEnabled())
-      LOG.debug(new StringBuffer(1000).append("Enabled = ")
-          .append(conf.getBoolean(Constants.CHECKPOINT_ENABLED, false))
-          .append(" checkPointInterval = ").append(checkPointInterval)
-          .append(" lastCheckPointStep = ").append(lastCheckPointStep)
-          .append(" getSuperstepCount() = ").append(getSuperstepCount())
-          .toString());
+  @SuppressWarnings("unchecked")
+  public final void initializeIO() throws Exception {
+
+    initInput();
+
+    String outdir = null;
+    if (conf.get("bsp.output.dir") != null) {
+      Path outputDir = new Path(conf.get("bsp.output.dir",
+          "tmp-" + System.currentTimeMillis()), Task.getOutputName(partition));
+      outdir = outputDir.makeQualified(fs).toString();
+    }
+    outWriter = bspJob.getOutputFormat().getRecordWriter(fs, bspJob, outdir);
+    final RecordWriter<K2, V2> finalOut = outWriter;
 
-    return (conf.getBoolean(Constants.CHECKPOINT_ENABLED, false)
-        && (checkPointInterval != 0) && (((int) (getSuperstepCount() - lastCheckPointStep)) >= checkPointInterval));
+    collector = new OutputCollector<K2, V2>() {
+      @Override
+      public void collect(K2 key, V2 value) throws IOException {
+        finalOut.write(key, value);
+      }
+    };
 
   }
 
-  private final String checkpointedPath() {
-    String backup = conf.get("bsp.checkpoint.prefix_path", "/checkpoint/");
-    String ckptPath = backup + bspJob.getJobID().toString() + "/"
-        + getSuperstepCount() + "/" + this.taskId.toString();
-    if (LOG.isDebugEnabled())
-      LOG.debug("Messages are to be saved to " + ckptPath);
-    return ckptPath;
+  @Override
+  public final M getCurrentMessage() throws IOException {
+    return messenger.getCurrentMessage();
   }
 
-  final void checkpoint(String checkpointedPath, BSPMessageBundle<M> bundle) {
-    FSDataOutputStream out = null;
-    try {
-      out = this.fs.create(new Path(checkpointedPath));
-      bundle.write(out);
-    } catch (IOException ioe) {
-      LOG.warn("Fail checkpointing messages to " + checkpointedPath, ioe);
-    } finally {
-      try {
-        if (null != out)
-          out.close();
-      } catch (IOException e) {
-        LOG.warn("Fail to close dfs output stream while checkpointing.", e);
-      }
-    }
+  @Override
+  public final void send(String peerName, M msg) throws IOException {
+    incrementCounter(PeerCounter.TOTAL_MESSAGES_SENT, 1L);
+    messenger.send(peerName, msg);
   }
 
   /*
@@ -302,34 +347,44 @@ public final class BSPPeerImpl<K1, V1, K
   @Override
   public final void sync() throws IOException, SyncException,
       InterruptedException {
-    long startBarrier = System.currentTimeMillis();
-    enterBarrier();
+
     // normally all messages should been send now, finalizing the send phase
     messenger.finishSendPhase();
     Iterator<Entry<InetSocketAddress, MessageQueue<M>>> it = messenger
         .getMessageIterator();
 
-    boolean shouldCheckPoint;
-
-    if ((shouldCheckPoint = isReadyToCheckpoint())) {
-      lastCheckPointStep = getSuperstepCount();
-    }
-
     while (it.hasNext()) {
       Entry<InetSocketAddress, MessageQueue<M>> entry = it.next();
       final InetSocketAddress addr = entry.getKey();
       final Iterable<M> messages = entry.getValue();
 
       final BSPMessageBundle<M> bundle = combineMessages(messages);
+      // remove this message during runtime to save a bit of memory
+      it.remove();
+      try {
+        messenger.transfer(addr, bundle);
+      } catch (Exception e) {
+        LOG.error("Error while sending messages", e);
+      }
+    }
 
-      if (shouldCheckPoint) {
-        checkpoint(checkpointedPath(), bundle);
+    if (this.faultToleranceService != null) {
+      try {
+        this.faultToleranceService.beforeBarrier();
+      } catch (Exception e) {
+        throw new IOException(e);
       }
+    }
 
-      // remove this message during runtime to save a bit of memory
-      it.remove();
+    long startBarrier = System.currentTimeMillis();
+    enterBarrier();
 
-      messenger.transfer(addr, bundle);
+    if (this.faultToleranceService != null) {
+      try {
+        this.faultToleranceService.duringBarrier();
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
     }
 
     leaveBarrier();
@@ -340,9 +395,38 @@ public final class BSPPeerImpl<K1, V1, K
 
     currentTaskStatus.setCounters(counters);
 
+    if (this.faultToleranceService != null) {
+      try {
+        this.faultToleranceService.afterBarrier();
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+
     umbilical.statusUpdate(taskId, currentTaskStatus);
     // Clear outgoing queues.
     messenger.clearOutgoingQueues();
+
+    // int msgsCount = -1;
+    // if (shouldCheckPoint) {
+    // msgsCount = checkpointReceivedMessages(checkpointedReceivePath());
+    // }
+    //
+    // this.syncClient.storeInformation(this.syncClient.constructKey(
+    // this.bspJob.getJobID(), "checkpoint", String.valueOf(getPeerIndex())),
+    // new IntWritable(msgsCount), false, null);
+
+    // if (msgsCount >= 0) {
+    // ArrayWritable writableArray = new ArrayWritable(IntWritable.class);
+    // Writable[] writeArr = new Writable[2];
+    // writeArr[0] = new IntWritable((int) getSuperstepCount());
+    // writeArr[1] = new IntWritable(msgsCount);
+    // writableArray.set(writeArr);
+    // this.syncClient.storeInformation(
+    // this.syncClient.constructKey(this.bspJob.getJobID(), "checkpoint",
+    // String.valueOf(getPeerIndex())), writableArray, true, null);
+    // }
+
   }
 
   private final BSPMessageBundle<M> combineMessages(Iterable<M> messages) {
@@ -424,8 +508,7 @@ public final class BSPPeerImpl<K1, V1, K
 
   @Override
   public int getPeerIndex() {
-    initPeerNames();
-    return Arrays.binarySearch(getAllPeerNames(), getPeerName());
+    return this.taskId.getTaskID().getId();
   }
 
   @Override

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPTask.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPTask.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPTask.java Sun Aug  5 11:07:48 2012
@@ -69,7 +69,7 @@ public final class BSPTask extends Task 
       boolean shouldKillSelf = false;
       try {
         if (LOG.isDebugEnabled())
-          LOG.debug("Pinging at time " + Calendar.getInstance().toString());
+          LOG.debug("Pinging at time " + Calendar.getInstance().getTimeInMillis());
         // if the RPC call returns false, it means that groomserver does not
         // have knowledge of this task.
         shouldKillSelf = !(pingRPC.ping(taskId) && bspThread.isAlive());

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/GroomServer.java Sun Aug  5 11:07:48 2012
@@ -163,19 +163,20 @@ public class GroomServer implements Runn
       }
 
       if (actions != null) {
-        assignedPeerNames = new HashMap<TaskAttemptID, Integer>();
+        // assignedPeerNames = new HashMap<TaskAttemptID, Integer>();
         int prevPort = Constants.DEFAULT_PEER_PORT;
 
         for (GroomServerAction action : actions) {
           if (action instanceof LaunchTaskAction) {
             Task t = ((LaunchTaskAction) action).getTask();
 
-            prevPort = BSPNetUtils.getNextAvailable(prevPort);
-            assignedPeerNames.put(t.getTaskID(), prevPort);
-
+            synchronized (assignedPeerNames) {
+              prevPort = BSPNetUtils.getNextAvailable(prevPort);
+              assignedPeerNames.put(t.getTaskID(), prevPort);
+            }
             LOG.info("Launch " + actions.length + " tasks.");
             startNewTask((LaunchTaskAction) action);
-          } else {
+          } else if (action instanceof KillTaskAction) {
 
             // TODO Use the cleanup thread
             // tasksToCleanup.put(action);
@@ -187,11 +188,30 @@ public class GroomServer implements Runn
               tip.taskStatus.setRunState(TaskStatus.State.FAILED);
               try {
                 tip.killAndCleanup(false);
+                tasks.remove(killAction.getTaskID());
+                runningTasks.remove(killAction.getTaskID());
               } catch (IOException ioe) {
                 throw new DirectiveException("Error when killing a "
                     + "TaskInProgress.", ioe);
               }
             }
+          } else if (action instanceof RecoverTaskAction) {
+            LOG.info("Recovery action task.");
+            RecoverTaskAction recoverAction = (RecoverTaskAction) action;
+            Task t = recoverAction.getTask();
+            LOG.info("Recovery action task." + t.getTaskID());
+            synchronized (assignedPeerNames) {
+              prevPort = BSPNetUtils.getNextAvailable(prevPort);
+              assignedPeerNames.put(t.getTaskID(), prevPort);
+            }
+            try {
+              startRecoveryTask(recoverAction);
+            } catch (IOException e) {
+              throw new DirectiveException(
+                  new StringBuffer().append("Error starting the recovery task")
+                  .append(t.getTaskID()).toString(),
+                  e);
+            }
           }
         }
       }
@@ -321,6 +341,8 @@ public class GroomServer implements Runn
     this.conf.set(Constants.PEER_HOST, localHostname);
     this.conf.set(Constants.GROOM_RPC_HOST, localHostname);
     this.maxCurrentTasks = conf.getInt(Constants.MAX_TASKS_PER_GROOM, 3);
+    this.assignedPeerNames = new HashMap<TaskAttemptID, Integer>(
+        2 * this.maxCurrentTasks);
 
     int rpcPort = -1;
     String rpcAddr = null;
@@ -571,6 +593,61 @@ public class GroomServer implements Runn
     }
   }
 
+  private void startRecoveryTask(RecoverTaskAction action) throws IOException {
+    Task t = action.getTask();
+    BSPJob jobConf = null;
+    try {
+      jobConf = new BSPJob(t.getJobID(), t.getJobFile());
+    } catch (IOException e1) {
+      LOG.error(e1);
+      throw e1;
+    }
+
+    TaskInProgress tip = new TaskInProgress(t, jobConf, this.groomServerName);
+    tip.markAsRecoveryTask(action.getSuperstepCount());
+    synchronized (this) {
+      if (tasks.containsKey(t.getTaskID())) {
+        TaskInProgress oldTip = tasks.get(t.getTaskID());
+        try {
+          oldTip.killRunner();
+        } catch (IOException e) {
+          LOG.error("Error killing the current process for " + t.getTaskID(), e);
+          throw e;
+        }
+      }
+
+      Iterator<TaskAttemptID> taskIterator = tasks.keySet().iterator();
+      while(taskIterator.hasNext()){
+        TaskAttemptID taskAttId = taskIterator.next();
+        if(taskAttId.getTaskID().equals(t.getTaskID().getTaskID())){
+          if(LOG.isDebugEnabled()){
+            LOG.debug("Removing tasks with id = " + t.getTaskID().getTaskID());
+          }
+          taskIterator.remove();
+          runningTasks.remove(taskAttId);
+        }
+      }
+      
+      tasks.put(t.getTaskID(), tip);
+      runningTasks.put(t.getTaskID(), tip);
+    }
+    try {
+      localizeJob(tip);
+    } catch (Throwable e) {
+      String msg = ("Error initializing " + tip.getTask().getTaskID() + ":\n" + StringUtils
+          .stringifyException(e));
+      LOG.warn(msg);
+      
+      try {
+        tip.killAndCleanup(true);
+      } catch (IOException ie2) {
+        LOG.info("Error cleaning up " + tip.getTask().getTaskID() + ":\n"
+            + StringUtils.stringifyException(ie2));
+      }
+      throw new IOException("Errro localizing the job.",e);
+    }
+  }
+
   /**
    * Update and report refresh status back to BSPMaster.
    */
@@ -730,13 +807,20 @@ public class GroomServer implements Runn
             + " monitorPeriod = "
             + monitorPeriod
             + " check = "
-            + (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING) && (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime - tip.lastPingedTimestamp) > monitorPeriod)))));
+            + (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING) && 
+                (((tip.lastPingedTimestamp == 0 && 
+                ((currentTime - tip.startTime) > 10 * monitorPeriod)) || 
+                ((tip.lastPingedTimestamp > 0) && 
+                    (currentTime - tip.lastPingedTimestamp) > monitorPeriod)))));
 
       // Task is out of contact if it has not pinged since more than
       // monitorPeriod. A task is given a leeway of 10 times monitorPeriod
       // to get started.
       if (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING)
-          && (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime - tip.lastPingedTimestamp) > monitorPeriod)))) {
+          && (((tip.lastPingedTimestamp == 0 
+          && ((currentTime - tip.startTime) > 10 * monitorPeriod)) 
+            || ((tip.lastPingedTimestamp > 0) 
+                && (currentTime - tip.lastPingedTimestamp) > monitorPeriod)))) {
 
         LOG.info("adding purge task: " + tip.getTask().getTaskID());
 
@@ -891,6 +975,7 @@ public class GroomServer implements Runn
 
     private long startTime = 0L;
     private volatile long lastPingedTimestamp = 0L;
+    private long startSuperstepCount = -1;
 
     public TaskInProgress(Task task, BSPJob jobConf, String groomServer) {
       this.task = task;
@@ -901,6 +986,15 @@ public class GroomServer implements Runn
           TaskStatus.Phase.STARTING, task.getCounters());
     }
 
+    public void markAsRecoveryTask(long superstepNumber) {
+      if (this.taskStatus.getRunState() != TaskStatus.State.FAILED) {
+        this.taskStatus.setRunState(TaskStatus.State.RECOVERING);
+        this.taskStatus.setPhase(TaskStatus.Phase.RECOVERING);
+        this.taskStatus.setStateString("recovering");
+      }
+      this.startSuperstepCount = superstepNumber;
+    }
+
     private void localizeTask(Task task) throws IOException {
       Path localJobFile = this.jobConf.getLocalPath(SUBDIR + "/"
           + task.getTaskID() + "/job.xml");
@@ -954,8 +1048,22 @@ public class GroomServer implements Runn
 
       // runner could be null if task-cleanup attempt is not localized yet
       if (runner != null) {
+        if(LOG.isDebugEnabled()){
+          LOG.debug("Killing process for " + this.task.getTaskID());
+        }
+        runner.killBsp();
+      }
+      runner = null;
+    }
+
+    public synchronized void killRunner() throws IOException {
+      if (runner != null) {
+        if(LOG.isDebugEnabled()){
+          LOG.debug("Killing process for " + this.task.getTaskID());
+        }
         runner.killBsp();
       }
+      runner = null;
     }
 
     /**
@@ -1143,6 +1251,11 @@ public class GroomServer implements Runn
         defaultConf.setInt("bsp.checkpoint.port", Integer.parseInt(args[4]));
       }
       defaultConf.setInt(Constants.PEER_PORT, peerPort);
+      
+      long superstep = Long.parseLong(args[4]);
+      TaskStatus.State state = TaskStatus.State.valueOf(args[5]);
+      LOG.debug("Starting peer for sstep " + superstep + " state = " + state);
+
 
       try {
         // use job-specified working directory
@@ -1153,7 +1266,7 @@ public class GroomServer implements Runn
         @SuppressWarnings("rawtypes")
         final BSPPeerImpl<?, ?, ?, ?, ?> bspPeer = new BSPPeerImpl(job,
             defaultConf, taskid, umbilical, task.partition, task.splitClass,
-            task.split, task.getCounters());
+            task.split, task.getCounters(), superstep, state);
 
         task.run(job, bspPeer, umbilical); // run the task
 
@@ -1195,6 +1308,24 @@ public class GroomServer implements Runn
     }
   }
 
+  public TaskStatus getTaskStatus(TaskAttemptID taskid) {
+    TaskInProgress tip = tasks.get(taskid);
+    if (tip != null) {
+      return tip.getStatus();
+    } else {
+      return null;
+    }
+  }
+
+  public long getStartSuperstep(TaskAttemptID taskid) {
+    TaskInProgress tip = tasks.get(taskid);
+    if (tip != null) {
+      return tip.startSuperstepCount;
+    } else {
+      return -1L;
+    }
+  }
+
   @Override
   public boolean ping(TaskAttemptID taskid) throws IOException {
     TaskInProgress tip = runningTasks.get(taskid);
@@ -1220,8 +1351,6 @@ public class GroomServer implements Runn
   @Override
   public void fsError(TaskAttemptID taskId, String message) throws IOException {
     LOG.fatal("Task: " + taskId + " - Killed due to FSError: " + message);
-    // TODO
-
   }
 
   @Override
@@ -1254,8 +1383,6 @@ public class GroomServer implements Runn
 
   @Override
   public void process(WatchedEvent event) {
-    // TODO Auto-generated method stub
-
   }
 
 }

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java Sun Aug  5 11:07:48 2012
@@ -28,7 +28,7 @@ import org.apache.hadoop.io.WritableUtil
  * A generic directive from the {@link org.apache.hama.bsp.BSPMaster} to the
  * {@link org.apache.hama.bsp.GroomServer} to take some 'action'.
  */
-abstract class GroomServerAction implements Writable {
+public abstract class GroomServerAction implements Writable {
 
   /**
    * Ennumeration of various 'actions' that the {@link BSPMaster} directs the
@@ -49,7 +49,13 @@ abstract class GroomServerAction impleme
     REINIT_GROOM,
 
     /** Ask a task to save its output. */
-    COMMIT_TASK
+    COMMIT_TASK,
+
+    /** Recover a task from failure. */
+    RECOVER_TASK,
+
+    /** Update information on a peer. */
+    UPDATE_PEER
   };
 
   /**
@@ -73,7 +79,17 @@ abstract class GroomServerAction impleme
       case KILL_JOB: {
         action = new KillJobAction();
       }
-        break;
+      break;
+      case RECOVER_TASK:
+      {
+        action = new RecoverTaskAction();
+      }
+      break;
+      case UPDATE_PEER:
+      {
+        action = new UpdatePeerAction();
+      }
+      break;
       case REINIT_GROOM: {
         action = new ReinitGroomAction();
       }

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobInProgress.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobInProgress.java Sun Aug  5 11:07:48 2012
@@ -21,9 +21,11 @@ import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -32,14 +34,21 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hama.Constants;
+import org.apache.hama.bsp.ft.AsyncRcvdMsgCheckpointImpl;
+import org.apache.hama.bsp.ft.BSPFaultTolerantService;
+import org.apache.hama.bsp.ft.FaultTolerantMasterService;
 import org.apache.hama.bsp.sync.MasterSyncClient;
+import org.apache.hama.bsp.taskallocation.BSPResource;
+import org.apache.hama.bsp.taskallocation.BestEffortDataLocalTaskAllocator;
+import org.apache.hama.bsp.taskallocation.TaskAllocationStrategy;
+import org.apache.hama.util.ReflectionUtils;
 
 /**
  * JobInProgress maintains all the info for keeping a Job on the straight and
  * narrow. It keeps its JobProfile and its latest JobStatus, plus a set of
  * tables for doing bookkeeping of its Tasks.ss
  */
-class JobInProgress {
+public class JobInProgress {
 
   /**
    * Used when the a kill is issued to a job which is initializing.
@@ -74,6 +83,8 @@ class JobInProgress {
   long launchTime;
   long finishTime;
 
+  int maxTaskAttempts;
+
   private String jobName;
 
   // private LocalFileSystem localFs;
@@ -89,11 +100,38 @@ class JobInProgress {
   String jobSplit;
 
   Map<Task, GroomServerStatus> taskToGroomMap;
+
   // Used only for scheduling!
-  Map<GroomServerStatus, Integer> tasksInGroomMap;
+  Map<GroomServerStatus, Integer> taskCountInGroomMap;
+
+  // If the task does not exist as key, it implies that the task did not fail
+  // before.
+  // Value in the map implies the attempt ID for which the key(task) was
+  // re-attempted before.
+  Map<Task, Integer> taskReattemptMap;
+
+  Set<TaskInProgress> recoveryTasks;
+
+  // This set keeps track of the tasks that have failed.
+  Set<Task> failedTasksTillNow;
 
   private int taskCompletionEventTracker = 0;
 
+  private TaskAllocationStrategy taskAllocationStrategy;
+
+  private FaultTolerantMasterService faultToleranceService;
+  
+  /**
+   * Used only for unit tests.
+   * @param jobId
+   * @param conf
+   */
+  public JobInProgress(BSPJobID jobId, Configuration conf){
+    this.conf = conf;
+    this.jobId = jobId;
+    master = null;
+  }
+
   public JobInProgress(BSPJobID jobId, Path jobFile, BSPMaster master,
       Configuration conf) throws IOException {
     this.conf = conf;
@@ -102,10 +140,6 @@ class JobInProgress {
     this.jobFile = jobFile;
     this.master = master;
 
-    this.taskToGroomMap = new HashMap<Task, GroomServerStatus>(2 * tasks.length);
-
-    this.tasksInGroomMap = new HashMap<GroomServerStatus, Integer>();
-
     this.status = new JobStatus(jobId, null, 0L, 0L,
         JobStatus.State.PREP.value(), counters);
     this.startTime = System.currentTimeMillis();
@@ -127,6 +161,9 @@ class JobInProgress {
     this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>(
         numBSPTasks + 10);
 
+    this.maxTaskAttempts = job.getConf().getInt(Constants.MAX_TASK_ATTEMPTS,
+        Constants.DEFAULT_MAX_TASK_ATTEMPTS);
+
     this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(),
         job.getJobName());
 
@@ -140,6 +177,8 @@ class JobInProgress {
       fs.copyToLocalFile(new Path(jarFile), localJarFile);
     }
 
+    failedTasksTillNow = new HashSet<Task>(2 * tasks.length);
+
   }
 
   public JobProfile getProfile() {
@@ -229,15 +268,20 @@ class JobInProgress {
       this.tasks = new TaskInProgress[numBSPTasks];
       for (int i = 0; i < numBSPTasks; i++) {
         tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
-            splits[i], this.master, this.conf, this, i);
+            splits[i], this.conf, this, i);
       }
     } else {
       this.tasks = new TaskInProgress[numBSPTasks];
       for (int i = 0; i < numBSPTasks; i++) {
         tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
-            null, this.master, this.conf, this, i);
+            null, this.conf, this, i);
       }
     }
+    this.taskToGroomMap = new HashMap<Task, GroomServerStatus>(2 * tasks.length);
+
+    this.taskCountInGroomMap = new HashMap<GroomServerStatus, Integer>();
+
+    this.recoveryTasks = new HashSet<TaskInProgress>(2 * tasks.length);
 
     // Update job status
     this.status = new JobStatus(this.status.getJobID(), this.profile.getUser(),
@@ -248,6 +292,31 @@ class JobInProgress {
     syncClient.registerJob(this.getJobID().toString());
 
     tasksInited = true;
+
+    Class<?> taskAllocatorClass = conf.getClass(Constants.TASK_ALLOCATOR_CLASS,
+        BestEffortDataLocalTaskAllocator.class, TaskAllocationStrategy.class);
+    this.taskAllocationStrategy = (TaskAllocationStrategy) ReflectionUtils
+        .newInstance(taskAllocatorClass, new Object[0]);
+
+    if (conf.getBoolean(Constants.FAULT_TOLERANCE_FLAG, false)) {
+
+      Class<?> ftClass = conf.getClass(Constants.FAULT_TOLERANCE_CLASS, 
+          AsyncRcvdMsgCheckpointImpl.class ,
+          BSPFaultTolerantService.class);
+      if (ftClass != null) {
+        try {
+          faultToleranceService = ((BSPFaultTolerantService<?>) ReflectionUtils
+              .newInstance(ftClass, new Object[0]))
+              .constructMasterFaultTolerance(jobId, maxTaskAttempts, tasks,
+                  conf, master.getSyncClient(), taskAllocationStrategy);
+          LOG.info("Initialized fault tolerance service with "
+              + ftClass.getCanonicalName());
+        } catch (Exception e) {
+          throw new IOException(e);
+        }
+      }
+    }
+
     LOG.info("Job is initialized.");
   }
 
@@ -269,29 +338,54 @@ class JobInProgress {
     }
 
     Task result = null;
+    BSPResource[] resources = new BSPResource[0];
 
-    try {
-      for (int i = 0; i < tasks.length; i++) {
-        if (!tasks[i].isRunning() && !tasks[i].isComplete()) {
-          result = tasks[i].getTaskToRun(groomStatuses, tasksInGroomMap);
-          if (result != null)
-            this.taskToGroomMap.put(result, tasks[i].getGroomServerStatus());
-          int taskInGroom = 0;
-          if (tasksInGroomMap.containsKey(tasks[i].getGroomServerStatus())) {
-            taskInGroom = tasksInGroomMap.get(tasks[i].getGroomServerStatus());
-          }
-          tasksInGroomMap.put(tasks[i].getGroomServerStatus(), taskInGroom + 1);
-          break;
+    for (int i = 0; i < tasks.length; i++) {
+      if (!tasks[i].isRunning() && !tasks[i].isComplete()) {
+
+        String[] selectedGrooms = taskAllocationStrategy.selectGrooms(
+            groomStatuses, taskCountInGroomMap, resources, tasks[i]);
+        GroomServerStatus groomStatus = taskAllocationStrategy
+            .getGroomToAllocate(groomStatuses, selectedGrooms,
+                taskCountInGroomMap, resources, tasks[i]);
+        if (groomStatus != null)
+          result = tasks[i].constructTask(groomStatus);
+        if (result != null) {
+          updateGroomTaskDetails(tasks[i].getGroomServerStatus(), result);
         }
+        break;
       }
-
-    } catch (IOException e) {
-      LOG.error("Exception while obtaining new task!", e);
     }
+
     counters.incrCounter(JobCounter.LAUNCHED_TASKS, 1L);
     return result;
   }
 
+  public void recoverTasks(Map<String, GroomServerStatus> groomStatuses,
+      Map<GroomServerStatus, List<GroomServerAction>> actionMap)
+      throws IOException {
+
+    if (this.faultToleranceService == null)
+      return;
+
+    try {
+      this.faultToleranceService.recoverTasks(this, groomStatuses,
+          fetchAndClearTasksToRecover(), tasks, taskCountInGroomMap, actionMap);
+    } catch (IOException e) {
+      throw e;
+    }
+  }
+
+  private void updateGroomTaskDetails(GroomServerStatus groomStatus, Task task) {
+    taskToGroomMap.put(task, groomStatus);
+    int tasksInGroom = 0;
+
+    if (taskCountInGroomMap.containsKey(groomStatus)) {
+      tasksInGroom = taskCountInGroomMap.get(groomStatus);
+    }
+    taskCountInGroomMap.put(groomStatus, tasksInGroom + 1);
+  }
+
   /**
    * Hosts that tasks run on.
    * 
@@ -305,6 +399,14 @@ class JobInProgress {
     return list;
   }
 
+  /**
+   * Mark the completed task status. If all the tasks are completed the status
+   * of the job is updated to notify the client on the completion of the whole
+   * job.
+   * 
+   * @param tip <code>TaskInProgress</code> object representing task.
+   * @param status The completed task status
+   */
   public synchronized void completedTask(TaskInProgress tip, TaskStatus status) {
     TaskAttemptID taskid = status.getTaskId();
     updateTaskStatus(tip, status);
@@ -339,6 +441,12 @@ class JobInProgress {
     }
   }
 
+  /**
+   * Mark failure of a task.
+   * 
+   * @param tip <code>TaskInProgress</code> object representing task.
+   * @param status The failed task status
+   */
   public void failedTask(TaskInProgress tip, TaskStatus status) {
     TaskAttemptID taskid = status.getTaskId();
     updateTaskStatus(tip, status);
@@ -354,8 +462,6 @@ class JobInProgress {
       }
     }
 
-    // TODO
-
     if (!allDone) {
       // Kill job
       this.kill();
@@ -372,6 +478,12 @@ class JobInProgress {
     }
   }
 
+  /**
+   * Updates the task status of the task.
+   * 
+   * @param tip <code>TaskInProgress</code> representing task
+   * @param taskStatus The status of the task.
+   */
   public synchronized void updateTaskStatus(TaskInProgress tip,
       TaskStatus taskStatus) {
     TaskAttemptID taskid = taskStatus.getTaskId();
@@ -415,6 +527,9 @@ class JobInProgress {
     }
   }
 
+  /**
+   * Kill the job.
+   */
   public synchronized void kill() {
     if (status.getRunState() != JobStatus.KILLED) {
       this.status = new JobStatus(status.getJobID(), this.profile.getUser(),
@@ -439,6 +554,12 @@ class JobInProgress {
    */
   synchronized void garbageCollect() {
     try {
+      
+      if(LOG.isDebugEnabled()){
+        LOG.debug("Removing " + localJobFile + " and " + localJarFile
+            + " getJobFile = " + profile.getJobFile());
+      }
+      
       // Definitely remove the local-disk copy of the job file
       if (localJobFile != null) {
         localFs.delete(localJobFile, true);
@@ -502,4 +623,62 @@ class JobInProgress {
     return events;
   }
 
+  /**
+   * Returns the configured maximum number of times the task could be
+   * re-attempted.
+   */
+  int getMaximumReAttempts() {
+    return maxTaskAttempts;
+  }
+
+  /**
+   * Returns true if the task should be restarted on failure. It also causes
+   * JobInProgress object to maintain state of the restart request.
+   */
+  synchronized boolean handleFailure(TaskInProgress tip) {
+    if (this.faultToleranceService == null
+        || (!faultToleranceService.isRecoveryPossible(tip)))
+      return false;
+
+    if (!faultToleranceService.isAlreadyRecovered(tip)) {
+      if(LOG.isDebugEnabled()){
+        LOG.debug("Adding recovery task " + tip.getCurrentTaskAttemptId());
+      }
+      recoveryTasks.add(tip);
+      status.setRunState(JobStatus.RECOVERING);
+      return true;
+    }
+    else if(LOG.isDebugEnabled()){
+      LOG.debug("Avoiding recovery task " + tip.getCurrentTaskAttemptId());
+    }
+    return false;
+    
+  }
+  
+  
+  /**
+   * 
+   * @return Returns the list of tasks in progress that has to be recovered.
+   */
+  synchronized TaskInProgress[] fetchAndClearTasksToRecover() {
+    TaskInProgress[] failedTasksInProgress = new TaskInProgress[recoveryTasks
+        .size()];
+    recoveryTasks.toArray(failedTasksInProgress);
+
+    recoveryTasks.clear();
+    return failedTasksInProgress;
+  }
+
+  public boolean isRecoveryPending() {
+    return recoveryTasks.size() != 0;
+  }
+
+  public Set<Task> getTaskSet() {
+    return taskToGroomMap.keySet();
+  }
+
+  public FaultTolerantMasterService getFaultToleranceService() {
+    return this.faultToleranceService;
+  }
+
 }

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java Sun Aug  5 11:07:48 2012
@@ -40,5 +40,13 @@ abstract class JobInProgressListener {
    * @throws IOException
    */
   public abstract void jobRemoved(JobInProgress job) throws IOException;
+  
+  /**
+   * Invoked when a task in job has to be recovered by {@link BSPMaster}.
+   * @param job The job to which the task belongs to.
+   * @param task that has to be recovered
+   * @throws IOException
+   */
+  public abstract void recoverTaskInJob(JobInProgress job) throws IOException;
 
 }

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobStatus.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobStatus.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobStatus.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobStatus.java Sun Aug  5 11:07:48 2012
@@ -44,7 +44,7 @@ public class JobStatus implements Writab
   }
 
   public static enum State {
-    RUNNING(1), SUCCEEDED(2), FAILED(3), PREP(4), KILLED(5);
+    RUNNING(1), SUCCEEDED(2), FAILED(3), PREP(4), KILLED(5), RECOVERING(6);
     int s;
 
     State(int s) {
@@ -74,6 +74,9 @@ public class JobStatus implements Writab
         case KILLED:
           name = "KILLED";
           break;
+        case RECOVERING:
+          name = "RECOVERING";
+          break;
       }
 
       return name;
@@ -86,6 +89,7 @@ public class JobStatus implements Writab
   public static final int FAILED = 3;
   public static final int PREP = 4;
   public static final int KILLED = 5;
+  public static final int RECOVERING = 6;
 
   private BSPJobID jobid;
   private long progress;

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/LaunchTaskAction.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/LaunchTaskAction.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/LaunchTaskAction.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/LaunchTaskAction.java Sun Aug  5 11:07:48 2012
@@ -23,7 +23,7 @@ import java.io.IOException;
 
 /**
  * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the
- * {@link org.apache.hama.bsp.GroomServer} to launch a new task.
+ * {@link org.apache.hama.bsp.GroomServer} to launch a recovery task.
  */
 class LaunchTaskAction extends GroomServerAction {
   private Task task;

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Sun Aug  5 11:07:48 2012
@@ -46,6 +46,7 @@ import org.apache.hama.HamaConfiguration
 import org.apache.hama.bsp.BSPJobClient.RawSplit;
 import org.apache.hama.bsp.BSPMaster.State;
 import org.apache.hama.bsp.message.MemoryQueue;
+import org.apache.hama.bsp.message.MessageEventListener;
 import org.apache.hama.bsp.message.MessageManager;
 import org.apache.hama.bsp.message.MessageManagerFactory;
 import org.apache.hama.bsp.message.MessageQueue;
@@ -407,9 +408,30 @@ public class LocalBSPRunner implements J
 
     @Override
     public void finishSendPhase() throws IOException {
-      // TODO Auto-generated method stub
+    }
+
+    @Override
+    public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle) {
+      for (Writable value : bundle.getMessages()) {
+        loopBackMessage(value);
+      }
+      
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void loopBackMessage(Writable message) {
+      localIncomingMessages.add((M)message);
+      peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
+          1L);
+    }
 
+    @Override
+    public void registerListener(MessageEventListener<M> listener)
+        throws IOException {
     }
+    
+    
 
   }
 
@@ -528,39 +550,33 @@ public class LocalBSPRunner implements J
     }
 
     @Override
-    public void close() throws InterruptedException {
+    public void close() throws IOException {
       barrier = null;
     }
     @Override
-    public Writable getInformation(String key,
-        Class<? extends Writable> classType) {
-      // TODO Auto-generated method stub
-      return null;
+    public boolean getInformation(String key, Writable writableVal) {
+      return false;
     }
 
     @Override
     public String constructKey(BSPJobID jobId, String... args) {
-      // TODO Auto-generated method stub
       return null;
     }
 
     @Override
     public boolean storeInformation(String key, Writable value,
         boolean permanent, SyncEventListener listener) {
-      // TODO Auto-generated method stub
       return false;
     }
 
     @Override
     public boolean addKey(String key, boolean permanent,
         SyncEventListener listener) {
-      // TODO Auto-generated method stub
       return false;
     }
 
     @Override
     public boolean hasKey(String key) {
-      // TODO Auto-generated method stub
       return false;
     }
 
@@ -568,16 +584,19 @@ public class LocalBSPRunner implements J
     public boolean registerListener(String key,
         SyncEvent event,
         SyncEventListener listener) {
-      // TODO Auto-generated method stub
       return false;
     }
 
     @Override
     public String[] getChildKeySet(String key, SyncEventListener listener) {
-      // TODO Auto-generated method stub
       return null;
     }
 
+    @Override
+    public boolean remove(String key, SyncEventListener listener) {
+      return false;
+    }
+
 	
   }
 

Added: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java?rev=1369551&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java (added)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java Sun Aug  5 11:07:48 2012
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the
+ * {@link org.apache.hama.bsp.GroomServer} to launch a new task.
+ */
+public class RecoverTaskAction extends GroomServerAction {
+  private Task task;
+  private LongWritable superstepNumber;
+
+  public RecoverTaskAction() {
+    super(ActionType.RECOVER_TASK);
+    superstepNumber = new LongWritable(-1L);
+  }
+
+  public RecoverTaskAction(Task task, long superstep) {
+    super(ActionType.RECOVER_TASK);
+    this.task = task;
+    this.superstepNumber = new LongWritable(superstep);
+  }
+
+  public Task getTask() {
+    return task;
+  }
+  
+  public long getSuperstepCount(){
+    return superstepNumber.get();
+  }
+
+  public void write(DataOutput out) throws IOException {
+    task.write(out);
+    superstepNumber.write(out);
+    
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    task = new BSPTask();
+    task.readFields(in);
+    superstepNumber.readFields(in);
+  }
+
+}

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java Sun Aug  5 11:07:48 2012
@@ -43,6 +43,7 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.sync.ZKSyncBSPMasterClient;
 import org.apache.hama.ipc.GroomProtocol;
 import org.apache.hama.monitor.Federator;
 import org.apache.hama.monitor.Federator.Act;
@@ -121,6 +122,12 @@ class SimpleTaskScheduler extends TaskSc
     public void jobRemoved(JobInProgress job) throws IOException {
       queueManager.get().moveJob(PROCESSING_QUEUE, FINISHED_QUEUE, job);
     }
+
+    @Override
+    public void recoverTaskInJob(JobInProgress job) throws IOException {
+      queueManager.get().addJob(WAIT_QUEUE, job);
+    }
+
   }
 
   private class JobProcessor extends Thread implements Schedulable {
@@ -221,11 +228,10 @@ class SimpleTaskScheduler extends TaskSc
         throw new NullPointerException("No job is specified.");
     }
 
-    @Override
-    public Boolean call() {
+    private Boolean scheduleNewTasks() {
 
       // Action to be sent for each task to the respective groom server.
-      Map<GroomServerStatus, List<LaunchTaskAction>> actionMap = new HashMap<GroomServerStatus, List<LaunchTaskAction>>(
+      Map<GroomServerStatus, List<GroomServerAction>> actionMap = new HashMap<GroomServerStatus, List<GroomServerAction>>(
           2 * this.groomStatuses.size());
       Set<Task> taskSet = new HashSet<Task>(2 * jip.tasks.length);
       Task t = null;
@@ -240,6 +246,7 @@ class SimpleTaskScheduler extends TaskSc
 
       // if all tasks could not be scheduled
       if (cnt != this.jip.tasks.length) {
+        LOG.error("Could not schedule all tasks!");
         return Boolean.FALSE;
       }
 
@@ -248,21 +255,49 @@ class SimpleTaskScheduler extends TaskSc
       while (taskIter.hasNext()) {
         Task task = taskIter.next();
         GroomServerStatus groomStatus = jip.getGroomStatusForTask(task);
-        List<LaunchTaskAction> taskActions = actionMap.get(groomStatus);
+        List<GroomServerAction> taskActions = actionMap.get(groomStatus);
         if (taskActions == null) {
-          taskActions = new ArrayList<LaunchTaskAction>(
+          taskActions = new ArrayList<GroomServerAction>(
               groomStatus.getMaxTasks());
         }
         taskActions.add(new LaunchTaskAction(task));
         actionMap.put(groomStatus, taskActions);
       }
 
+      sendDirectivesToGrooms(actionMap);
+
+      return Boolean.TRUE;
+    }
+
+    /**
+     * Schedule recovery tasks.
+     * 
+     * @return TRUE object if scheduling is successful else returns FALSE
+     */
+    private Boolean scheduleRecoveryTasks() {
+
+      // Action to be sent for each task to the respective groom server.
+      Map<GroomServerStatus, List<GroomServerAction>> actionMap = new HashMap<GroomServerStatus, List<GroomServerAction>>(
+          2 * this.groomStatuses.size());
+
+      try {
+        jip.recoverTasks(groomStatuses, actionMap);
+      } catch (IOException e) {
+        return Boolean.FALSE;
+      }
+      return sendDirectivesToGrooms(actionMap);
+
+    }
+
+    private Boolean sendDirectivesToGrooms(
+        Map<GroomServerStatus, List<GroomServerAction>> actionMap) {
       Iterator<GroomServerStatus> groomIter = actionMap.keySet().iterator();
-      while (jip.getStatus().getRunState() == JobStatus.RUNNING
+      while ((jip.getStatus().getRunState() == JobStatus.RUNNING || jip
+          .getStatus().getRunState() == JobStatus.RECOVERING)
           && groomIter.hasNext()) {
 
         GroomServerStatus groomStatus = groomIter.next();
-        List<LaunchTaskAction> actionList = actionMap.get(groomStatus);
+        List<GroomServerAction> actionList = actionMap.get(groomStatus);
 
         GroomProtocol worker = groomServerManager.get().findGroomServer(
             groomStatus);
@@ -276,18 +311,29 @@ class SimpleTaskScheduler extends TaskSc
           LOG.error(
               "Fail to dispatch tasks to GroomServer "
                   + groomStatus.getGroomName(), ioe);
+          return Boolean.FALSE;
         }
 
       }
 
       if (groomIter.hasNext()
-          && jip.getStatus().getRunState() != JobStatus.RUNNING) {
+          && (jip.getStatus().getRunState() != JobStatus.RUNNING || jip
+              .getStatus().getRunState() != JobStatus.RECOVERING)) {
         LOG.warn("Currently master only shcedules job in running state. "
             + "This may be refined in the future. JobId:" + jip.getJobID());
+        return Boolean.FALSE;
       }
 
       return Boolean.TRUE;
     }
+
+    public Boolean call() {
+      if (jip.isRecoveryPending()) {
+        return scheduleRecoveryTasks();
+      } else {
+        return scheduleNewTasks();
+      }
+    }
   }
 
   /**
@@ -365,8 +411,10 @@ class SimpleTaskScheduler extends TaskSc
     this.jobProcessor.start();
     if (null != getConf()
         && getConf().getBoolean("bsp.federator.enabled", false)) {
-      this.scheduler.scheduleAtFixedRate(new JvmCollector(federator.get(),
-          ((BSPMaster) groomServerManager.get()).zk), 5, 5, SECONDS);
+      this.scheduler.scheduleAtFixedRate(
+          new JvmCollector(federator.get(),
+              ((ZKSyncBSPMasterClient) ((BSPMaster) groomServerManager.get())
+                  .getSyncClient()).getZK()), 5, 5, SECONDS);
     }
 
     if (null != monitorManager.get()) {

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java Sun Aug  5 11:07:48 2012
@@ -34,7 +34,7 @@ import org.apache.hama.bsp.BSPJobClient.
  * TaskInProgress maintains all the info needed for a Task in the lifetime of
  * its owning Job.
  */
-class TaskInProgress {
+public class TaskInProgress {
   public static final Log LOG = LogFactory.getLog(TaskInProgress.class);
 
   private Configuration conf;
@@ -48,7 +48,6 @@ class TaskInProgress {
   // Job Meta
   private String jobFile = null;
   private int partition;
-  private BSPMaster bspMaster;
   private TaskID id;
   private JobInProgress job;
   private int completes = 0;
@@ -69,6 +68,8 @@ class TaskInProgress {
 
   // The first taskid of this tip
   private TaskAttemptID firstTaskId;
+  
+  private TaskAttemptID currentTaskId;
 
   // Map from task Id -> GroomServer Id, contains tasks that are
   // currently runnings
@@ -84,6 +85,8 @@ class TaskInProgress {
 
   private RawSplit rawSplit;
 
+  private int mySuperstep = -1;
+
   /**
    * Constructor for new nexus between BSPMaster and GroomServer.
    * 
@@ -99,12 +102,20 @@ class TaskInProgress {
     init(jobId);
   }
 
+  /**
+   * 
+   * @param jobId
+   * @param jobFile
+   * @param rawSplit
+   * @param conf
+   * @param job
+   * @param partition
+   */
   public TaskInProgress(BSPJobID jobId, String jobFile, RawSplit rawSplit,
-      BSPMaster master, Configuration conf, JobInProgress job, int partition) {
+      Configuration conf, JobInProgress job, int partition) {
     this.jobId = jobId;
     this.jobFile = jobFile;
     this.rawSplit = rawSplit;
-    this.setBspMaster(master);
     this.job = job;
     this.setConf(conf);
     this.partition = partition;
@@ -112,18 +123,178 @@ class TaskInProgress {
     init(jobId);
   }
 
+  /**
+   * 
+   * @param jobId
+   */
   private void init(BSPJobID jobId) {
     this.id = new TaskID(jobId, partition);
     this.startTime = System.currentTimeMillis();
   }
 
   /**
-   * Return a Task that can be sent to a GroomServer for execution.
+   * 
+   * @param taskid
+   * @param grooms
+   * @param tasksInGroomMap
+   * @param possibleLocations
+   * @return
+   */
+  private String getGroomToSchedule(TaskAttemptID taskid,
+      Map<String, GroomServerStatus> grooms,
+      Map<GroomServerStatus, Integer> tasksInGroomMap,
+      String[] possibleLocations) {
+
+    for (int i = 0; i < possibleLocations.length; ++i) {
+      String location = possibleLocations[i];
+      GroomServerStatus groom = grooms.get(location);
+      if (groom == null)
+        continue;
+      Integer taskInGroom = tasksInGroomMap.get(groom);
+      taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+      if (taskInGroom < groom.getMaxTasks()
+          && location.equals(groom.getGroomHostName())) {
+        return groom.getGroomHostName();
+      }
+    }
+    return null;
+  }
+
+  /**
+   * 
+   * @param grooms
+   * @param tasksInGroomMap
+   * @return
+   */
+  private String getAnyGroomToSchedule(Map<String, GroomServerStatus> grooms,
+      Map<GroomServerStatus, Integer> tasksInGroomMap) {
+
+    Iterator<String> groomIter = grooms.keySet().iterator();
+    while (groomIter.hasNext()) {
+      GroomServerStatus groom = grooms.get(groomIter.next());
+      if (groom == null)
+        continue;
+      Integer taskInGroom = tasksInGroomMap.get(groom);
+      taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+      if (taskInGroom < groom.getMaxTasks()) {
+        return groom.getGroomHostName();
+      }
+    }
+    return null;
+  }
+
+  /**
+   * 
+   * @param groomStatus
+   * @param grooms
+   * @return
    */
-  public Task getTaskToRun(Map<String, GroomServerStatus> grooms,
-      Map<GroomServerStatus, Integer> tasksInGroomMap) throws IOException {
+  public Task constructTask(GroomServerStatus groomStatus) {
+    if(groomStatus == null){
+      return null;
+    }
+    TaskAttemptID taskId = computeTaskId();
+    if (taskId == null) {
+      return null;
+    } else {
+      String splitClass = null;
+      BytesWritable split = null;
+      currentTaskId = taskId;
+      String groomName = groomStatus.getGroomHostName();
+      Task t = new BSPTask(jobId, jobFile, taskId, partition, splitClass, split);
+      activeTasks.put(taskId, groomName);
+      myGroomStatus = groomStatus;
+      return t;
+    }
+
+  }
+
+  // /* Remove */
+  // private Task getGroomForTask(TaskAttemptID taskid,
+  // Map<String, GroomServerStatus> grooms,
+  // Map<GroomServerStatus, Integer> tasksInGroomMap) {
+  // String splitClass = null;
+  // BytesWritable split = null;
+  // Task t = null;
+  // if (rawSplit != null) {
+  // splitClass = rawSplit.getClassName();
+  // split = rawSplit.getBytes();
+  // String[] possibleLocations = rawSplit.getLocations();
+  // String groomName = getGroomToSchedule(taskid, grooms, tasksInGroomMap,
+  // possibleLocations);
+  // if (groomName != null) {
+  // t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
+  // activeTasks.put(taskid, groomName);
+  // myGroomStatus = grooms.get(groomName);
+  // }
+  // }
+  //
+  // if (t == null) {
+  // String groomName = getAnyGroomToSchedule(grooms, tasksInGroomMap);
+  // if (groomName != null) {
+  // t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
+  // activeTasks.put(taskid, groomName);
+  // myGroomStatus = grooms.get(groomName);
+  // }
+  // }
+  //
+  // return t;
+  // }
+
+  private Task getGroomForRecoverTaskInHosts(TaskAttemptID taskid,
+      Map<String, GroomServerStatus> grooms,
+      Map<GroomServerStatus, Integer> tasksInGroomMap,
+      String[] possibleLocations) {
+    String splitClass = null;
+    BytesWritable split = null;
     Task t = null;
+    String groomName = getGroomToSchedule(taskid, grooms, tasksInGroomMap,
+        possibleLocations);
+    if (groomName != null) {
+      t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
+      activeTasks.put(taskid, groomName);
+      myGroomStatus = grooms.get(groomName);
+    }
+
+    if (t == null) {
+      groomName = getAnyGroomToSchedule(grooms, tasksInGroomMap);
+      if (groomName != null) {
+        t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
+        activeTasks.put(taskid, groomName);
+        myGroomStatus = grooms.get(groomName);
+      }
+    }
+
+    return t;
+  }
+
+  public Task getRecoveryTask(Map<String, GroomServerStatus> grooms,
+      Map<GroomServerStatus, Integer> tasksInGroomMap, String[] hostNames)
+      throws IOException {
+    Integer count = tasksInGroomMap.get(myGroomStatus);
+    if (count != null) {
+      tasksInGroomMap.put(myGroomStatus, count - 1);
+    }
+
+    TaskAttemptID taskId = computeTaskId();
+    LOG.debug("Recovering task = " + String.valueOf(taskId));
+    if (taskId == null) {
+      return null;
+    } else {
+      return getGroomForRecoverTaskInHosts(taskId, grooms, tasksInGroomMap,
+          hostNames);
+    }
+  }
 
+  /**
+   * 
+   * @return
+   */
+  public boolean canStartTask() {
+    return (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts));
+  }
+
+  private TaskAttemptID computeTaskId() {
     TaskAttemptID taskid = null;
     if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts)) {
       int attemptId = job.getNumRestarts() * NUM_ATTEMPTS_PER_RESTART
@@ -135,54 +306,20 @@ class TaskInProgress {
           + " attempts for the tip '" + getTIPId() + "'");
       return null;
     }
-
-    String splitClass = null;
-    BytesWritable split = null;
-    GroomServerStatus selectedGroom = null;
-    if (rawSplit != null) {
-      splitClass = rawSplit.getClassName();
-      split = rawSplit.getBytes();
-      String[] possibleLocations = rawSplit.getLocations();
-      for (int i = 0; i < possibleLocations.length; ++i) {
-        String location = possibleLocations[i];
-        GroomServerStatus groom = grooms.get(location);
-        if (groom == null) {
-          LOG.error("Could not find groom for location: " + location
-              + " ; active grooms: " + grooms.keySet());
-          continue;
-        }
-        Integer taskInGroom = tasksInGroomMap.get(groom);
-        taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
-        if (taskInGroom < groom.getMaxTasks()
-            && location.equals(groom.getGroomHostName())) {
-          selectedGroom = groom;
-          t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
-          activeTasks.put(taskid, groom.getGroomName());
-
-          break;
-        }
-      }
-    }
-    // Failed in attempt to get data locality or there was no input split.
-    if (selectedGroom == null) {
-      Iterator<String> groomIter = grooms.keySet().iterator();
-      while (groomIter.hasNext()) {
-        GroomServerStatus groom = grooms.get(groomIter.next());
-        Integer taskInGroom = tasksInGroomMap.get(groom);
-        taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
-        if (taskInGroom < groom.getMaxTasks()) {
-          selectedGroom = groom;
-          t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
-          activeTasks.put(taskid, groom.getGroomName());
-        }
-      }
-    }
-
-    myGroomStatus = selectedGroom;
-
-    return t;
+    return taskid;
   }
 
+  // /** Remove */
+  // public Task getTaskToRun(Map<String, GroomServerStatus> grooms,
+  // Map<GroomServerStatus, Integer> tasksInGroomMap) throws IOException {
+  // TaskAttemptID taskId = computeTaskId();
+  // if (taskId == null) {
+  // return null;
+  // } else {
+  // return getGroomForTask(taskId, grooms, tasksInGroomMap);
+  // }
+  // }
+
   // //////////////////////////////////
   // Accessors
   // //////////////////////////////////
@@ -344,20 +481,6 @@ class TaskInProgress {
   }
 
   /**
-   * @param bspMaster the bspMaster to set
-   */
-  public void setBspMaster(BSPMaster bspMaster) {
-    this.bspMaster = bspMaster;
-  }
-
-  /**
-   * @return the bspMaster
-   */
-  public BSPMaster getBspMaster() {
-    return bspMaster;
-  }
-
-  /**
    * Set the event number that was raised for this tip
    */
   public void setSuccessEventNumber(int eventNumber) {
@@ -382,4 +505,22 @@ class TaskInProgress {
     return taskStatuses.get(taskid).getGroomServer();
   }
 
+  public int getSuperstep() {
+    return mySuperstep;
+  }
+
+  public void setSuperstep(int mySuperstep) {
+    this.mySuperstep = mySuperstep;
+  }
+
+  // TODO: In future this should be extended to the list of resources that the
+  // task requires.
+  public RawSplit getFileSplit() {
+    return this.rawSplit;
+  }
+  
+  public TaskAttemptID getCurrentTaskAttemptId(){
+    return this.currentTaskId;
+  }
+
 }

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/TaskRunner.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/TaskRunner.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/TaskRunner.java Sun Aug  5 11:07:48 2012
@@ -122,8 +122,9 @@ public class TaskRunner extends Thread {
 
         int exit_code = bspProcess.waitFor();
         if (!bspKilled && exit_code != 0) {
+          
           throw new IOException("BSP task process exit with nonzero status of "
-              + exit_code + ".");
+              + exit_code + ". command = " + commands);
         }
       } catch (InterruptedException e) {
         LOG.warn("Thread is interrupted when execeuting BSP process.", e);
@@ -223,6 +224,17 @@ public class TaskRunner extends Thread {
       vargs.add(Integer.toString(addr.getPort()));
       vargs.add(task.getTaskID().toString());
       vargs.add(groomServer.groomHostName);
+      vargs.add(Long.toString(groomServer.getStartSuperstep(task.getTaskID())));
+      TaskStatus status = groomServer.getTaskStatus(task.getTaskID());
+      
+      if(status != null && 
+          TaskStatus.State.RECOVERING.equals(status.getRunState())){
+        vargs.add(TaskStatus.State.RECOVERING.name());
+      }
+      else{
+        vargs.add(TaskStatus.State.RUNNING.name());
+      }
+      
     }
     return vargs;
   }
@@ -285,6 +297,7 @@ public class TaskRunner extends Thread {
     if (bspProcess != null) {
       bspProcess.destroy();
     }
+    
   }
 
   /**

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/TaskStatus.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/TaskStatus.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/TaskStatus.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/TaskStatus.java Sun Aug  5 11:07:48 2012
@@ -37,13 +37,14 @@ public class TaskStatus implements Writa
 
   // enumeration for reporting current phase of a task.
   public static enum Phase {
-    STARTING, COMPUTE, BARRIER_SYNC, CLEANUP
+    STARTING, COMPUTE, BARRIER_SYNC, CLEANUP, RECOVERING
   }
 
   // what state is the task in?
   public static enum State {
     RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, COMMIT_PENDING,
-    FAILED_UNCLEAN, KILLED_UNCLEAN
+    FAILED_UNCLEAN, KILLED_UNCLEAN, FAULT_NOTIFIED, RECOVERY_SCHEDULING,
+    RECOVERY_SCHEDULED, RECOVERING
   }
 
   private BSPJobID jobId;

Added: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/UpdatePeerAction.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/UpdatePeerAction.java?rev=1369551&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/UpdatePeerAction.java (added)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/UpdatePeerAction.java Sun Aug  5 11:07:48 2012
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} 
+ * to the {@link org.apache.hama.bsp.GroomServer} to kill a task.
+ */
+class UpdatePeerAction extends GroomServerAction {
+  TaskAttemptID taskId;
+  TaskAttemptID peerTaskId;
+  Text groomName;
+  
+  public UpdatePeerAction() {
+    super(ActionType.UPDATE_PEER);
+    taskId = new TaskAttemptID();
+    groomName = new Text("");
+  }
+  
+  public UpdatePeerAction(TaskAttemptID taskId, TaskAttemptID peerTaskId, 
+      String groom) {
+    super(ActionType.UPDATE_PEER);
+    this.taskId = taskId;
+    this.peerTaskId = peerTaskId;
+    this.groomName = new Text(groom);
+  }
+
+  public TaskAttemptID getTaskID() {
+    return taskId;
+  }
+  
+  public TaskAttemptID getPeerTaskID(){
+    return peerTaskId;
+  }
+  
+  public String getGroomName(){
+    return groomName.toString();
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    taskId.write(out);
+    peerTaskId.write(out);
+    groomName.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    taskId.readFields(in);
+    peerTaskId.readFields(in);
+    groomName.readFields(in);
+  }
+}



Mime
View raw message