incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1309761 - in /incubator/hama/trunk: CHANGES.txt core/src/main/java/org/apache/hama/bsp/JobInProgress.java core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
Date Thu, 05 Apr 2012 11:21:51 GMT
Author: edwardyoon
Date: Thu Apr  5 11:21:50 2012
New Revision: 1309761

URL: http://svn.apache.org/viewvc?rev=1309761&view=rev
Log:
should make an attempt to start the task on the host that has the input split located on it

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1309761&r1=1309760&r2=1309761&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Thu Apr  5 11:21:50 2012
@@ -16,6 +16,8 @@ Release 0.5 - Unreleased
 
   IMPROVEMENTS
 
+    HAMA-543: Make best effort to start BSP Task on the host 
+                where the input split is located. (Suraj Menon via edwardyoon)
     HAMA-527: Update commons-configuration version (edwardyoon)
     HAMA-499: Refactor clearZKNodes() in BSPMaster (Apurv Verma via tjungblut)
     HAMA-485: Fill Counters with useful information (tjungblut)

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java?rev=1309761&r1=1309760&r2=1309761&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java Thu Apr
 5 11:21:50 2012
@@ -19,6 +19,9 @@ package org.apache.hama.bsp;
 
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -59,6 +62,7 @@ class JobInProgress {
   Path jobFile = null;
   Path localJobFile = null;
   Path localJarFile = null;
+
   private LocalFileSystem localFs;
   // Indicates how many times the job got restarted
   private int restartCount;
@@ -81,6 +85,10 @@ class JobInProgress {
   int clusterSize;
   String jobSplit;
 
+  Map<Task, GroomServerStatus> taskToGroomMap;
+  // Used only for scheduling!
+  Map<GroomServerStatus, Integer> tasksInGroomMap;
+
   public JobInProgress(BSPJobID jobId, Path jobFile, BSPMaster master,
       Configuration conf) throws IOException {
     this.conf = conf;
@@ -89,6 +97,10 @@ class JobInProgress {
     this.jobFile = jobFile;
     this.master = master;
 
+    this.taskToGroomMap = new HashMap<Task, GroomServerStatus>(2 * tasks.length);
+
+    this.tasksInGroomMap = new HashMap<GroomServerStatus, Integer>();
+
     this.status = new JobStatus(jobId, null, 0L, 0L,
         JobStatus.State.PREP.value(), counters);
     this.startTime = System.currentTimeMillis();
@@ -231,9 +243,17 @@ class JobInProgress {
     LOG.info("Job is initialized.");
   }
 
-  public synchronized Task obtainNewTask(GroomServerStatus status,
-      int clusterSize) {
-    this.clusterSize = clusterSize;
+  public Iterator<GroomServerStatus> getGroomsForTask() {
+    return null;
+  }
+
+  public GroomServerStatus getGroomStatusForTask(Task t) {
+    return this.taskToGroomMap.get(t);
+  }
+
+  public synchronized Task obtainNewTask(
+      Map<String, GroomServerStatus> groomStatuses) {
+    this.clusterSize = groomStatuses.size();
 
     if (this.status.getRunState() != JobStatus.RUNNING) {
       LOG.info("Cannot create task split for " + profile.getJobID());
@@ -241,10 +261,18 @@ class JobInProgress {
     }
 
     Task result = null;
+
     try {
       for (int i = 0; i < tasks.length; i++) {
         if (!tasks[i].isRunning() && !tasks[i].isComplete()) {
-          result = tasks[i].getTaskToRun(status);
+          result = tasks[i].getTaskToRun(groomStatuses, tasksInGroomMap);
+          if (result != null)
+            this.taskToGroomMap.put(result, tasks[i].getGroomServerStatus());
+          int taskInGroom = 0;
+          if (tasksInGroomMap.containsKey(tasks[i].getGroomServerStatus())) {
+            taskInGroom = tasksInGroomMap.get(tasks[i].getGroomServerStatus());
+          }
+          tasksInGroomMap.put(tasks[i].getGroomServerStatus(), taskInGroom + 1);
           break;
         }
       }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=1309761&r1=1309760&r2=1309761&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java Thu
Apr  5 11:21:50 2012
@@ -22,29 +22,34 @@ import static java.util.concurrent.TimeU
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
-import static java.util.concurrent.TimeUnit.*;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.GroomServerStatus;
 import org.apache.hama.ipc.GroomProtocol;
 import org.apache.hama.monitor.Federator;
-import org.apache.hama.monitor.Federator.Act;
-import org.apache.hama.monitor.Federator.CollectorHandler;
 import org.apache.hama.monitor.Metric;
 import org.apache.hama.monitor.MetricsRecord;
 import org.apache.hama.monitor.Monitor;
 import org.apache.hama.monitor.ZKCollector;
+import org.apache.hama.monitor.Federator.Act;
+import org.apache.hama.monitor.Federator.CollectorHandler;
 import org.apache.zookeeper.ZooKeeper;
 
 /**
@@ -108,8 +113,9 @@ class SimpleTaskScheduler extends TaskSc
         // schedule
         Collection<GroomServerStatus> glist = groomServerManager
             .groomServerStatusKeySet();
-        schedule(j, (GroomServerStatus[]) glist
-            .toArray(new GroomServerStatus[glist.size()]));
+        schedule(j,
+            (GroomServerStatus[]) glist.toArray(new GroomServerStatus[glist
+                .size()]));
       }
     }
 
@@ -124,25 +130,47 @@ class SimpleTaskScheduler extends TaskSc
       ClusterStatus clusterStatus = groomServerManager.getClusterStatus(false);
       final int numGroomServers = clusterStatus.getGroomServers();
       final ScheduledExecutorService sched = Executors
-          .newScheduledThreadPool(statuses.length + 5);
-      for (GroomServerStatus status : statuses) {
-        sched
-            .schedule(new TaskWorker(status, numGroomServers, job), 0, SECONDS);
-      }// for
+          .newScheduledThreadPool(1);// statuses.length + 5);
+
+      ScheduledFuture<Boolean> jobScheduleResult = sched.schedule(
+          new TaskWorker(statuses, numGroomServers, job), 0, SECONDS);
+
+      Boolean jobResult = Boolean.FALSE;
+
+      try {
+        jobResult = jobScheduleResult.get();
+      } catch (InterruptedException e) {
+        // TODO Auto-generated catch block
+        jobResult = Boolean.FALSE;
+        LOG.error("Error submitting job", e);
+      } catch (ExecutionException e) {
+        // TODO Auto-generated catch block
+        jobResult = Boolean.FALSE;
+        LOG.error("Error submitting job", e);
+      }
+      if (Boolean.FALSE.equals(jobResult)) {
+        LOG.error(new StringBuffer(512).append("Scheduling of job ")
+            .append(job.getJobName())
+            .append(" could not be done successfully. Killing it!").toString());
+        job.kill();
+      }
     }
   }
 
-  private class TaskWorker implements Runnable {
-    private final GroomServerStatus stus;
+  private class TaskWorker implements Callable<Boolean> {
+    private final Map<String, GroomServerStatus> groomStatuses;
     private final int groomNum;
     private final JobInProgress jip;
 
-    TaskWorker(final GroomServerStatus stus, final int num,
+    TaskWorker(final GroomServerStatus[] stus, final int num,
         final JobInProgress jip) {
-      this.stus = stus;
+      this.groomStatuses = new HashMap<String, GroomServerStatus>(2 * num);
+      for (GroomServerStatus status : stus) {
+        this.groomStatuses.put(status.hostName, status);
+      }
       this.groomNum = num;
       this.jip = jip;
-      if (null == this.stus)
+      if (null == this.groomStatuses)
         throw new NullPointerException("Target groom server is not "
             + "specified.");
       if (-1 == this.groomNum)
@@ -151,35 +179,71 @@ class SimpleTaskScheduler extends TaskSc
         throw new NullPointerException("No job is specified.");
     }
 
-    public void run() {
-      // obtain tasks
-      List<GroomServerAction> actions = new ArrayList<GroomServerAction>();
+    public Boolean call() {
+
+      // Action to be sent for each task to the respective groom server.
+      Map<GroomServerStatus, List<LaunchTaskAction>> actionMap = 
+          new HashMap<GroomServerStatus, List<LaunchTaskAction>>(
+              2 * this.groomStatuses.size());
+      Set<Task> taskSet = new HashSet<Task>(2 * jip.tasks.length);
       Task t = null;
       int cnt = 0;
-      while((t = jip.obtainNewTask(this.stus, groomNum) ) != null) {
-        actions.add(new LaunchTaskAction(t));
-        cnt++;
-
-        if(cnt > (this.stus.getMaxTasks() - 1))
+      while ((t = jip.obtainNewTask(this.groomStatuses)) != null) {
+        taskSet.add(t);
+        // Scheduled all tasks
+        if (++cnt == this.jip.tasks.length) {
           break;
+        }
       }
-      
+
+      // if all tasks could not be scheduled
+      if (cnt != this.jip.tasks.length) {
+        return Boolean.FALSE;
+      }
+
       // assembly into actions
-      // List<Task> tasks = new ArrayList<Task>();
-      if (jip.getStatus().getRunState() == JobStatus.RUNNING) {
-        GroomProtocol worker = groomServerManager.findGroomServer(this.stus);
+      Iterator<Task> taskIter = taskSet.iterator();
+      while (taskIter.hasNext()) {
+        Task task = taskIter.next();
+        GroomServerStatus groomStatus = jip.getGroomStatusForTask(task);
+        List<LaunchTaskAction> taskActions = actionMap.get(groomStatus);
+        if (taskActions == null) {
+          taskActions = new ArrayList<LaunchTaskAction>(
+              groomStatus.getMaxTasks());
+        }
+        taskActions.add(new LaunchTaskAction(task));
+        actionMap.put(groomStatus, taskActions);
+      }
+
+      Iterator<GroomServerStatus> groomIter = actionMap.keySet().iterator();
+      while (jip.getStatus().getRunState() == JobStatus.RUNNING
+          && groomIter.hasNext()) {
+
+        GroomServerStatus groomStatus = groomIter.next();
+        List<LaunchTaskAction> actionList = actionMap.get(groomStatus);
+
+        GroomProtocol worker = groomServerManager.findGroomServer(groomStatus);
         try {
           // dispatch() to the groom server
-          Directive d1 = new DispatchTasksDirective(actions.toArray(new GroomServerAction[0]));
+          GroomServerAction[] actions = new GroomServerAction[actionList.size()];
+          actionList.toArray(actions);
+          Directive d1 = new DispatchTasksDirective(actions);
           worker.dispatch(d1);
         } catch (IOException ioe) {
-          LOG.error("Fail to dispatch tasks to GroomServer "
-              + this.stus.getGroomName(), ioe);
+          LOG.error(
+              "Fail to dispatch tasks to GroomServer "
+                  + groomStatus.getGroomName(), ioe);
         }
-      } else {
+
+      }
+
+      if (groomIter.hasNext()
+          && jip.getStatus().getRunState() != JobStatus.RUNNING) {
         LOG.warn("Currently master only shcedules job in running state. "
             + "This may be refined in the future. JobId:" + jip.getJobID());
       }
+
+      return Boolean.TRUE;
     }
   }
 

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java?rev=1309761&r1=1309760&r2=1309761&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java Thu Apr
 5 11:21:50 2012
@@ -18,6 +18,8 @@
 package org.apache.hama.bsp;
 
 import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
@@ -49,6 +51,8 @@ class TaskInProgress {
   private TaskID id;
   private JobInProgress job;
   private int completes = 0;
+  
+  private GroomServerStatus myGroomStatus = null;
 
   // Status
   // private double progress = 0;
@@ -114,7 +118,8 @@ class TaskInProgress {
   /**
    * Return a Task that can be sent to a GroomServer for execution.
    */
-  public Task getTaskToRun(GroomServerStatus status) throws IOException {
+  public Task getTaskToRun(Map<String, GroomServerStatus> grooms, 
+      Map<GroomServerStatus, Integer> tasksInGroomMap) throws IOException {
     Task t = null;
 
     TaskAttemptID taskid = null;
@@ -131,13 +136,42 @@ class TaskInProgress {
     
     String splitClass = null;
     BytesWritable split = null;
+    GroomServerStatus selectedGroom = null;
     if(rawSplit != null){
       splitClass = rawSplit.getClassName();
       split = rawSplit.getBytes();
+      String[] possibleLocations = rawSplit.getLocations();
+      for (int i = 0; i < possibleLocations.length; ++i){
+        String location = possibleLocations[i];
+        GroomServerStatus groom = grooms.get(location);
+        Integer taskInGroom = tasksInGroomMap.get(groom);
+        taskInGroom = (taskInGroom == null)?0:taskInGroom;
+        if(taskInGroom < groom.getMaxTasks() && 
+            location.equals(groom.getGroomHostName())){
+            selectedGroom = groom;
+            t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
+            activeTasks.put(taskid, groom.getGroomName());
+            
+            break;
+        }
+      }
+    }
+    //Failed in attempt to get data locality or there was no input split.
+    if(selectedGroom == null){
+      Iterator<String> groomIter = grooms.keySet().iterator();
+      while(groomIter.hasNext()) {
+        GroomServerStatus groom = grooms.get(groomIter.next());
+        Integer taskInGroom = tasksInGroomMap.get(groom);
+        taskInGroom = (taskInGroom == null)?0:taskInGroom;
+        if(taskInGroom < groom.getMaxTasks()){
+          selectedGroom = groom;
+          t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
+          activeTasks.put(taskid, groom.getGroomName());
+        }
+      }
     }
     
-    t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
-    activeTasks.put(taskid, status.getGroomName());
+    myGroomStatus = selectedGroom;
 
     return t;
   }
@@ -170,6 +204,10 @@ class TaskInProgress {
   public TreeMap<TaskAttemptID, String> getTasks() {
     return activeTasks;
   }
+  
+  public GroomServerStatus getGroomServerStatus(){
+    return myGroomStatus;
+  }
 
   /**
    * Is the Task associated with taskid is the first attempt of the tip?



Mime
View raw message