hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1599853 - in /hama/trunk: CHANGES.txt core/src/main/java/org/apache/hama/bsp/BSPMaster.java mesos/src/main/java/org/apache/hama/bsp/ResourceManager.java mesos/src/main/java/org/apache/hama/bsp/TaskDelegator.java
Date Wed, 04 Jun 2014 03:46:27 GMT
Author: edwardyoon
Date: Wed Jun  4 03:46:27 2014
New Revision: 1599853

URL: http://svn.apache.org/r1599853
Log:
HAMA-909: Improve Mesos Scheduler's Fault Tolerance (Jeff Fenchel via edwardyoon)

Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
    hama/trunk/mesos/src/main/java/org/apache/hama/bsp/ResourceManager.java
    hama/trunk/mesos/src/main/java/org/apache/hama/bsp/TaskDelegator.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1599853&r1=1599852&r2=1599853&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Wed Jun  4 03:46:27 2014
@@ -17,6 +17,7 @@ Release 0.7.0 (unreleased changes)
 
   IMPROVEMENTS
 
+   HAMA-909: Improve Mesos Scheduler's Fault Tolerance (Jeff Fenchel via edwardyoon)
    HAMA-823: Remove javadoc warnings (Victor Lee via edwardyoon)  
    HAMA-886: Refactoring core.bundle package (edwardyoon)
    HAMA-899: Add getAdjacentPeerNames() that returns the names of locally adjacent peers
(edwardyoon)

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java?rev=1599853&r1=1599852&r2=1599853&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java Wed Jun  4 03:46:27 2014
@@ -200,6 +200,13 @@ public class BSPMaster implements JobSub
               } else {
                 jip.status.setRunState(JobStatus.FAILED);
                 jip.failedTask(tip, ts);
+                for (JobInProgressListener listener : jobInProgressListeners) {
+                  try {
+                    listener.jobRemoved(jip);
+                  } catch (IOException ioe) {
+                    LOG.error("Fail to alter scheduler a job is moved.", ioe);
+                  }
+                }
               }
             }
             if (jip.getStatus().getRunState() == JobStatus.SUCCEEDED) {
@@ -224,6 +231,14 @@ public class BSPMaster implements JobSub
                 throw new DirectiveException("Error when dispatching kill task"
                     + " action.", ioe);
               }
+              
+              for (JobInProgressListener listener : jobInProgressListeners) {
+                try {
+                  listener.jobRemoved(jip);
+                } catch (IOException ioe) {
+                  LOG.error("Fail to alter scheduler a job is moved.", ioe);
+                }
+              }
             }
           }
         } else {

Modified: hama/trunk/mesos/src/main/java/org/apache/hama/bsp/ResourceManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/mesos/src/main/java/org/apache/hama/bsp/ResourceManager.java?rev=1599853&r1=1599852&r2=1599853&view=diff
==============================================================================
--- hama/trunk/mesos/src/main/java/org/apache/hama/bsp/ResourceManager.java (original)
+++ hama/trunk/mesos/src/main/java/org/apache/hama/bsp/ResourceManager.java Wed Jun  4 03:46:27
2014
@@ -23,12 +23,14 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -36,7 +38,6 @@ import java.util.regex.Pattern;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hama.bsp.TaskWorkerManager.TaskWorker;
-import org.apache.hama.bsp.message.io.SyncReadByteBufferInputStream;
 import org.apache.mesos.Protos;
 import org.apache.mesos.SchedulerDriver;
 import org.apache.mesos.Protos.CommandInfo;
@@ -59,11 +60,13 @@ public class ResourceManager {
   private Configuration conf;
   private static long launchedTasks = 0;
 
-  private Set<JobInProgress> executing = new HashSet<JobInProgress>();
-  private Set<TaskInProgress> executingTasks = new HashSet<TaskInProgress>();
+  private Set<JobInProgress> executingJobs =  Collections.synchronizedSet(new HashSet<JobInProgress>());
+  private Set<TaskInProgress> executingTasks = Collections.synchronizedSet(new HashSet<TaskInProgress>());
   private Map<String, java.util.Queue<TaskInProgress>> tasksToRunByGroom;
   private Set<TaskInProgress> tasksToRun;
 
+  private Set<TaskInProgress> recoveryTasks = Collections.synchronizedSet(new HashSet<TaskInProgress>());
+
   private long slotMemory;
   // Overhead requirements for the container groom server
   double groomCpus;
@@ -75,44 +78,40 @@ public class ResourceManager {
   /**
    * Constructor for the mesos resource manager
    * 
-   * @param conf
-   *          The configuration options for hama
-   * @param serverManager
-   *          A reference to the groom server manager
-   * @param driver
-   *          The mesos driver. This is required to terminate tasks
+   * @param conf The configuration options for hama
+   * @param serverManager A reference to the groom server manager
+   * @param driver The mesos driver. This is required to terminate tasks
    */
   public ResourceManager(Configuration conf,
       AtomicReference<GroomServerManager> serverManager, SchedulerDriver driver) {
-    tasksToRunByGroom = new HashMap<String, java.util.Queue<TaskInProgress>>();
-    tasksToRunByGroom.put(anyGroomServer, new LinkedList<TaskInProgress>());
+    tasksToRunByGroom = new ConcurrentHashMap<String, java.util.Queue<TaskInProgress>>();
+    tasksToRunByGroom.put(anyGroomServer, new ConcurrentLinkedQueue<TaskInProgress>());
     tasksToRun = new HashSet<TaskInProgress>();
 
     slotMemory = parseMemory(conf);
 
-    taskDelegator = new TaskDelegator(serverManager, driver, executingTasks);
+    taskDelegator = new TaskDelegator(serverManager, driver, recoveryTasks);
     serverManager.get().addGroomStatusListener(taskDelegator);
     this.conf = conf;
-    
+
     groomCpus = conf.getInt("hama.mesos.groom.cpu", 0);
-    groomMem = conf.getInt("hama.mesos.groom.mem", 200);;
-    groomDisk = conf.getInt("hama.mesos.groom.disk", 0);;
+    groomMem = conf.getInt("hama.mesos.groom.mem", 200);
+    groomDisk = conf.getInt("hama.mesos.groom.disk", 0);
   }
 
   /**
    * Handle a resource offer by the mesos framework
    * 
-   * @param schedulerDriver
-   *          The mesos scheduler driver
-   * @param offers
-   *          A list of offers from mesos
+   * @param schedulerDriver The mesos scheduler driver
+   * @param offers A list of offers from mesos
    */
   public void resourceOffers(SchedulerDriver schedulerDriver, List<Offer> offers) {
 
     if (tasksToRun.isEmpty()) {
-      //there is no need to track executing tasks if everything is started
+      // there is no need to track executing tasks if everything is
+      // started
       clearQueues();
-      
+
       for (Offer offer : offers) {
         schedulerDriver.declineOffer(offer.getId());
       }
@@ -124,14 +123,14 @@ public class ResourceManager {
   }
 
   private void clearQueues() {
-	synchronized (tasksToRunByGroom) {
-	  for ( java.util.Queue<TaskInProgress> queue : tasksToRunByGroom.values()) {
-		  queue.clear();
-	  }
-	  executingTasks.clear();
-	}
+    synchronized (tasksToRunByGroom) {
+      for (java.util.Queue<TaskInProgress> queue : tasksToRunByGroom.values()) {
+        queue.clear();
+      }
+      executingTasks.clear();
+    }
   }
-  
+
   private void useOffer(SchedulerDriver schedulerDriver, Offer offer) {
     log.debug("Received offer From: " + offer.getHostname());
 
@@ -189,31 +188,31 @@ public class ResourceManager {
     @Override
     public Boolean call() throws Exception {
       log.debug("Task Worker called: " + jip.tasks.length);
-      if (!jip.isRecoveryPending()) {
-        for (TaskInProgress tip : jip.tasks) {
-          String[] grooms = jip.getPreferredGrooms(tip, null, null);
 
-          if (grooms == null) {
-            grooms = new String[] { anyGroomServer };
-          }
-          log.info("Prefered Groom for tip " + tip.idWithinJob() + ": "
-              + grooms[0]);
-      	  synchronized (tasksToRunByGroom) {
-            for (String groom : grooms) {
-              if (!tasksToRunByGroom.containsKey(groom)) {
-                tasksToRunByGroom.put(groom, new LinkedList<TaskInProgress>());
-                log.info("Received request for groom: " + groom);
-              }
-              tasksToRunByGroom.get(groom).add(tip);
+      for (TaskInProgress tip : jip.tasks) {
+        if (jip.isRecoveryPending()) {
+          recoveryTasks.add(tip);
+        }
+        String[] grooms = jip.getPreferredGrooms(tip, null, null);
+
+        if (grooms == null) {
+          grooms = new String[] { anyGroomServer };
+        }
+        log.info("Prefered Groom for tip " + tip.idWithinJob() + ": "
+            + grooms[0]);
+        synchronized (tasksToRunByGroom) {
+          for (String groom : grooms) {
+            if (!tasksToRunByGroom.containsKey(groom)) {
+              tasksToRunByGroom.put(groom, new ConcurrentLinkedQueue<TaskInProgress>());
+              log.info("Received request for groom: " + groom);
             }
-            tasksToRun.add(tip);
-      	  }
+            tasksToRunByGroom.get(groom).add(tip);
+          }
+          tasksToRun.add(tip);
         }
-      } else {
-    	  throw new UnsupportedOperationException("This feature is not yet implemented");
-        //TODO: Handle task recovery
       }
-      executing.add(jip);
+
+      executingJobs.add(jip);
       return true;
     }
   }
@@ -421,8 +420,7 @@ public class ResourceManager {
   /**
    * Get the amount of memory requested in MiB
    * 
-   * @param javaOpts
-   *          java options
+   * @param javaOpts java options
    * @return mesos formated memory argument
    */
   private static long parseMemory(Configuration conf) {
@@ -439,7 +437,8 @@ public class ResourceManager {
         value = value * 1024;
       }
 
-      // remove memory request from the child java opts so it may be added later
+      // remove memory request from the child java opts so it may be added
+      // later
       conf.set("bsp.child.java.opts", memMatcher.replaceAll(""));
 
       return value;

Modified: hama/trunk/mesos/src/main/java/org/apache/hama/bsp/TaskDelegator.java
URL: http://svn.apache.org/viewvc/hama/trunk/mesos/src/main/java/org/apache/hama/bsp/TaskDelegator.java?rev=1599853&r1=1599852&r2=1599853&view=diff
==============================================================================
--- hama/trunk/mesos/src/main/java/org/apache/hama/bsp/TaskDelegator.java (original)
+++ hama/trunk/mesos/src/main/java/org/apache/hama/bsp/TaskDelegator.java Wed Jun  4 03:46:27
2014
@@ -18,8 +18,9 @@
 package org.apache.hama.bsp;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
@@ -35,14 +36,15 @@ import org.apache.mesos.SchedulerDriver;
 public class TaskDelegator implements GroomStatusListener {
   public static final Log LOG = LogFactory.getLog(MesosScheduler.class);
 
-  private Set<TaskInProgress> executingTasks;
+  private Set<TaskInProgress> recoveryTasks;
 
-  
   /**
    * Map to hold assignments from groomServerNames to TasksInProgress
    */
   private MultiValueMap assignments = new MultiValueMap();
 
+  private MultiValueMap jobAssignments = new MultiValueMap();
+
   private AtomicReference<GroomServerManager> groomServerManager;
 
   /**
@@ -56,10 +58,12 @@ public class TaskDelegator implements Gr
   private SchedulerDriver driver;
 
   public TaskDelegator(AtomicReference<GroomServerManager> groomServerManager,
-      SchedulerDriver driver, Set<TaskInProgress> executingTasks) {
+      SchedulerDriver driver, Set<TaskInProgress> recoveryTasks) {
     this.groomServerManager = groomServerManager;
+    groomServerManager.get().addJobInProgressListener(
+        new TaskDelegatorJobListener());
     this.driver = driver;
-    this.executingTasks = executingTasks;
+    this.recoveryTasks = recoveryTasks;
   }
 
   @Override
@@ -82,10 +86,8 @@ public class TaskDelegator implements Gr
   /**
    * Add a task for execution when the groom server becomes available
    * 
-   * @param tip
-   *          The TaskInProgress to execute
-   * @param hostName
-   *          The hostname where the resource reservation was made
+   * @param tip The TaskInProgress to execute
+   * @param hostName The hostname where the resource reservation was made
    */
   public void addTask(TaskInProgress tip, Protos.TaskID taskId,
       String hostName, Integer port) {
@@ -99,16 +101,37 @@ public class TaskDelegator implements Gr
       execute(tip, groomServers.get(key));
     } else {
       assignments.put(key, tip);
+      jobAssignments.put(tip.getJob(), new Pair<Object, Object>(key, tip));
     }
   }
 
   private void execute(TaskInProgress tip, GroomServerStatus status) {
     Task task = tip.constructTask(status);
 
+    GroomServerAction[] actions;
     GroomProtocol worker = groomServerManager.get().findGroomServer(status);
 
-    GroomServerAction[] actions = new GroomServerAction[1];
-    actions[0] = new LaunchTaskAction(task);
+    if (!recoveryTasks.contains(tip)) {
+      actions = new GroomServerAction[1];
+      actions[0] = new LaunchTaskAction(task);
+    } else {
+      LOG.trace("Executing a recovery task");
+      recoveryTasks.remove(tip);
+      HashMap<String, GroomServerStatus> groomStatuses = new HashMap<String, GroomServerStatus>(
+          1);
+      groomStatuses.put(status.hostName, status);
+      Map<GroomServerStatus, List<GroomServerAction>> actionMap = new HashMap<GroomServerStatus,
List<GroomServerAction>>(
+          2 * groomStatuses.size());
+      try {
+        tip.getJob().recoverTasks(groomStatuses, actionMap);
+      } catch (IOException e) {
+        LOG.warn("Task recovery failed", e);
+      }
+
+      List<GroomServerAction> actionList = actionMap.get(status);
+      actions = new GroomServerAction[actionList.size()];
+      actionList.toArray(actions);
+    }
     Directive d1 = new DispatchTasksDirective(actions);
     try {
       worker.dispatch(d1);
@@ -125,10 +148,41 @@ public class TaskDelegator implements Gr
             status.rpcServer).getPort());
     groomServers.put(key, status);
     assignments.remove(key, task);
-    
+    jobAssignments.remove(task.getJob(), new Pair<Object, Object>(key, task));
+
     if (assignments.getCollection(key) == null) {
       groomServers.remove(key);
       driver.killTask(groomTaskIDs.get(key));
     }
   }
+
+  private class TaskDelegatorJobListener extends JobInProgressListener {
+
+    @Override
+    public void jobAdded(JobInProgress job) throws IOException {
+
+    }
+
+    @Override
+    public void jobRemoved(JobInProgress job) throws IOException {
+      @SuppressWarnings("unchecked")
+      Collection<Pair<Object, Object>> remainingTasks = jobAssignments
+          .getCollection(job);
+      if (remainingTasks != null) {
+        for (Pair<Object, Object> taskToRemove : remainingTasks) {
+          assignments.remove(taskToRemove.getKey(), taskToRemove.getValue());
+          if (assignments.getCollection(taskToRemove.getKey()) == null) {
+            groomServers.remove(taskToRemove.getKey());
+            driver.killTask(groomTaskIDs.get(taskToRemove.getKey()));
+          }
+        }
+        jobAssignments.remove(job);
+      }
+    }
+
+    @Override
+    public void recoverTaskInJob(JobInProgress job) throws IOException {
+
+    }
+  }
 }



Mime
View raw message