hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1079225 - in /hadoop/mapreduce/branches/yahoo-merge/src: contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ contrib/fairscheduler/src/java/org/apache/hadoop/mapred/ java/org/apache/hadoop/mapred/ test/mapred/org/apache/hadoop/ma...
Date Tue, 08 Mar 2011 05:57:53 GMT
Author: omalley
Date: Tue Mar  8 05:57:53 2011
New Revision: 1079225

URL: http://svn.apache.org/viewvc?rev=1079225&view=rev
Log:
commit 15cc571d1458a3955ef66434620c854f3a68250d
Author: Greg Roelofs <roelofs@yahoo-inc.com>
Date:   Fri Dec 17 18:24:44 2010 -0800

    MR-1220 v9 code-review fixes requested by Dick in comment 37 of BZ (all
    trivial, all optional).

Modified:
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestBadRecords.java

Modified: hadoop/mapreduce/branches/yahoo-merge/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=1079225&r1=1079224&r2=1079225&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
(original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
Tue Mar  8 05:57:53 2011
@@ -630,7 +630,7 @@ class CapacityTaskScheduler extends Task
     boolean hasSpeculativeTask(JobInProgress job, TaskTrackerStatus tts) {
       //Check if job supports speculative map execution first then
       //check if job has speculative maps.
-      return (job.hasSpeculativeMaps() &&
+      return (job.canSpeculateMaps() &&
               hasSpeculativeTask(job.getTasks(TaskType.MAP), tts));
     }
 
@@ -674,9 +674,9 @@ class CapacityTaskScheduler extends Task
 
     @Override
     boolean hasSpeculativeTask(JobInProgress job, TaskTrackerStatus tts) {
-      //check if the job supports reduce speculative execution first then
+      //check if the job supports speculative reduce execution first then
       //check if the job has speculative tasks.
-      return (job.hasSpeculativeReduces() &&
+      return (job.canSpeculateReduces() &&
               hasSpeculativeTask(job.getTasks(TaskType.REDUCE), tts));
     }
 

Modified: hadoop/mapreduce/branches/yahoo-merge/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java?rev=1079225&r1=1079224&r2=1079225&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java
(original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java
Tue Mar  8 05:57:53 2011
@@ -60,7 +60,7 @@ public class JobSchedulable extends Sche
       TaskInProgress[] tips = (taskType == TaskType.MAP ? 
           job.getTasks(TaskType.MAP) : job.getTasks(TaskType.REDUCE));
       boolean speculationEnabled = (taskType == TaskType.MAP ?
-          job.hasSpeculativeMaps() : job.hasSpeculativeReduces());
+          job.canSpeculateMaps() : job.canSpeculateReduces());
       long time = scheduler.getClock().getTime();
       for (TaskInProgress tip: tips) {
         if (!tip.isComplete()) {

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=1079225&r1=1079224&r2=1079225&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JobInProgress.java
Tue Mar  8 05:57:53 2011
@@ -242,8 +242,8 @@ public class JobInProgress {
   FileSystem fs;
   String user;
   JobID jobId;
-  volatile private boolean hasSpeculativeMaps;
-  volatile private boolean hasSpeculativeReduces;
+  volatile private boolean canSpeculateMaps;
+  volatile private boolean canSpeculateReduces;
   long inputLength = 0;
   
   Counters jobCounters = new Counters();
@@ -344,8 +344,8 @@ public class JobInProgress {
     this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
 
     
-    hasSpeculativeMaps = conf.getMapSpeculativeExecution();
-    hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
+    canSpeculateMaps = conf.getMapSpeculativeExecution();
+    canSpeculateReduces = conf.getReduceSpeculativeExecution();
     this.nonLocalMaps = new LinkedList<TaskInProgress>();
     this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
     this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
@@ -465,8 +465,8 @@ public class JobInProgress {
 
       this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
 
-      hasSpeculativeMaps = conf.getMapSpeculativeExecution();
-      hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
+      canSpeculateMaps = conf.getMapSpeculativeExecution();
+      canSpeculateReduces = conf.getReduceSpeculativeExecution();
       this.maxLevel = jobtracker.getNumTaskCacheLevels();
       this.anyCacheLevel = this.maxLevel + 1;
       this.nonLocalMaps = new LinkedList<TaskInProgress>();
@@ -524,7 +524,7 @@ public class JobInProgress {
     for (int i = 0; i < splits.length; i++) {
       // originally UberTask was a flavor of MapTask (=> maps[0]), but the
       // current design makes it a flavor of ReduceTask instead (=> reduces[0])
-      TaskInProgress map = uberMode? reduces[0] : maps[i];
+      TaskInProgress map = isUber()? reduces[0] : maps[i];
       String[] splitLocations = splits[i].getLocations();
       if (splitLocations.length == 0) {
         nonLocalMaps.add(map);
@@ -688,14 +688,14 @@ public class JobInProgress {
         && (Math.max(memoryPerMap, memoryPerReduce) <= sysMemSizeForUberSlot
             || sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT);
 
-    if (uberMode) {
+    if (isUber()) {
       // save internal details for UI and abort-cleanup
       uberMapTasks = numMapTasks;
       uberReduceTasks = numReduceTasks;
       uberSetupCleanupNeeded = jobSetupCleanupNeeded;
 
       // disable speculation:  makes no sense to speculate an entire job
-      hasSpeculativeMaps = hasSpeculativeReduces = false;
+      canSpeculateMaps = canSpeculateReduces = false;
 
       // This method modifies numMapTasks (-> 0) and numReduceTasks (-> 1)
       // [and actually creates a TIP, not a true Task; latter is created in
@@ -708,14 +708,14 @@ public class JobInProgress {
     jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
     jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);
 
-    if (!uberMode) {
+    if (!isUber()) {
       createMapTasks(jobFile.toString(), taskSplitMetaInfo);
     } else {
       // must create this array even if zero elements:
       maps = new TaskInProgress[numMapTasks];
     }
 
-    if (numMapTasks > 0 || (uberMode && uberMapTasks > 0)) {
+    if (numMapTasks > 0 || (isUber() && uberMapTasks > 0)) {
       // this is needed even if all tasks are shielded by uber event horizon
       nonRunningMapCache = createCache(taskSplitMetaInfo, maxLevel);
     }
@@ -723,7 +723,7 @@ public class JobInProgress {
     // set the launch time
     launchTime = JobTracker.getClock().getTime();
 
-    if (!uberMode) {
+    if (!isUber()) {
       createReduceTasks(jobFile.toString());
     }
 
@@ -755,12 +755,12 @@ public class JobInProgress {
     // possible FIXME:  if profile.getJobID() and jobId are same thing, pick one
     JobInitedEvent jie = new JobInitedEvent(
         profile.getJobID(), launchTime, numMapTasks, numReduceTasks,
-        uberMode, uberMapTasks, uberReduceTasks,
+        isUber(), uberMapTasks, uberReduceTasks,
         JobStatus.getJobRunState(JobStatus.PREP));
     jobHistory.logEvent(jie, jobId);
 
     // Log the number of map and reduce tasks
-    if (!uberMode) {
+    if (!isUber()) {
       LOG.info("Job " + jobId + " initialized successfully with " + numMapTasks
                + " map tasks and " + numReduceTasks + " reduce tasks.");
     } else {
@@ -887,7 +887,7 @@ public class JobInProgress {
                        numReduceTasks, jobtracker, conf, this, 1);
     cleanup[1].setJobCleanupTask();
 
-    if (uberMode) {
+    if (isUber()) {
       // ubertasks handle setup internally (as well as cleanup in the normal
       // case), so henceforth we pretend that setup and cleanup aren't needed
       // --unless/until job fails or is killed, in which case a separate
@@ -1327,7 +1327,7 @@ public class JobInProgress {
         this.status.setReduceProgress((float) (this.status.reduceProgress() + 
                                            (progressDelta / reduces.length)));
       }
-      if (uberMode &&
+      if (isUber() &&
           (schedulingInfo == null || schedulingInfo.toString().equals(""))) {
         setSchedulingInfo("");  // force method call so uber info will be added
       }
@@ -1589,8 +1589,8 @@ public class JobInProgress {
                                              int numUniqueHosts,
                                              boolean isMapSlot
                                             ) throws IOException {
-    // uberMode condition should be redundant, but make sure anyway:
-    if (!tasksInited.get() || !jobSetupCleanupNeeded || uberMode) {
+    // isUber() condition should be redundant, but make sure anyway:
+    if (!tasksInited.get() || !jobSetupCleanupNeeded || isUber()) {
       return null;
     }
 
@@ -2179,12 +2179,12 @@ public class JobInProgress {
     return null;
   }
   
-  public boolean hasSpeculativeMaps() {
-    return hasSpeculativeMaps;
+  public boolean canSpeculateMaps() {
+    return canSpeculateMaps;
   }
 
-  public boolean hasSpeculativeReduces() {
-    return hasSpeculativeReduces;
+  public boolean canSpeculateReduces() {
+    return canSpeculateReduces;
   }
 
   /**
@@ -2409,7 +2409,7 @@ public class JobInProgress {
     // II) Running TIP :
     // 
  
-    if (hasSpeculativeMaps) {
+    if (canSpeculateMaps) {
       tip = getSpeculativeMap(taskTrackerName, taskTrackerHost);
       if (tip != null) {
         return tip.getIdWithinJob();
@@ -2497,7 +2497,7 @@ public class JobInProgress {
     }
 
     // 2. check for a reduce tip to be speculated
-    if (hasSpeculativeReduces) {
+    if (canSpeculateReduces) {
       tip = getSpeculativeReduce(taskTrackerName, taskTrackerHost);
       if (tip != null) {
         return tip.getIdWithinJob();
@@ -2832,7 +2832,7 @@ public class JobInProgress {
       runningMapTasks -= 1;
       finishedMapTasks += 1;
       metrics.completeMap(taskid);
-      if (!tip.isJobSetupTask() && hasSpeculativeMaps) {
+      if (!tip.isJobSetupTask() && canSpeculateMaps) {
         updateTaskTrackerStats(tip,ttStatus,trackerMapStats,mapTaskStats);
       }
       // remove the completed map from the resp running caches
@@ -2847,7 +2847,7 @@ public class JobInProgress {
       runningReduceTasks -= 1;
       finishedReduceTasks += 1;
       metrics.completeReduce(taskid);
-      if (!tip.isJobSetupTask() && hasSpeculativeReduces) {
+      if (!tip.isJobSetupTask() && canSpeculateReduces) {
         updateTaskTrackerStats(tip,ttStatus,trackerReduceStats,reduceTaskStats);
       }
       // remove the completed reduces from the running reducers set
@@ -3056,7 +3056,7 @@ public class JobInProgress {
       return;
     }
 
-    if (uberMode) {
+    if (isUber()) {
       // restore setup/cleanup status so separate cleanup task will be launched
       // (see obtainJobCleanupTask())
       jobSetupCleanupNeeded = uberSetupCleanupNeeded;
@@ -3598,7 +3598,7 @@ public class JobInProgress {
   
   public synchronized void setSchedulingInfo(Object schedulingInfo) {
     // UberTasking is a kind of scheduling decision, so we append it here
-    if (uberMode) {
+    if (isUber()) {
       StringBuilder sb = new StringBuilder(256);
       if (schedulingInfo != null && !schedulingInfo.toString().equals("")) {
         sb.append(schedulingInfo).append(" ");

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=1079225&r1=1079224&r2=1079225&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java
(original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java
Tue Mar  8 05:57:53 2011
@@ -769,8 +769,8 @@ class TaskInProgress {
       if (!isCleanupAttempt(taskid)) {
         taskStatuses.put(taskid, status);
         //we don't want to include setup tasks in the task execution stats
-        if (!isJobSetupTask() && ((isMapTask() && job.hasSpeculativeMaps())
|| 
-                                  (!isMapTask() && job.hasSpeculativeReduces())))
{
+        if (!isJobSetupTask() && ((isMapTask() && job.canSpeculateMaps())
||
+                                 (!isMapTask() && job.canSpeculateReduces()))) {
           long now = JobTracker.getClock().getTime();
           double oldProgRate = getOldProgressRate();
           double currProgRate = getCurrentProgressRate(now);

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestBadRecords.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestBadRecords.java?rev=1079225&r1=1079224&r2=1079225&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestBadRecords.java
(original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestBadRecords.java
Tue Mar  8 05:57:53 2011
@@ -115,25 +115,25 @@ public class TestBadRecords extends Clus
     
     // validate counters (1st assert arg is _expected_ value!)
     Counters counters = runningJob.getCounters();
-    assertEquals(mapperBadRecords.size(),
+    assertEquals("wrong MAP_SKIPPED_RECORDS value", mapperBadRecords.size(),
         counters.findCounter(TaskCounter.MAP_SKIPPED_RECORDS).getCounter());
     
     int mapRecs = input.size() - mapperBadRecords.size();
-    assertEquals(mapRecs,
+    assertEquals("wrong MAP_INPUT_RECORDS value", mapRecs,
         counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getCounter());
-    assertEquals(mapRecs,
+    assertEquals("wrong MAP_OUTPUT_RECORDS value", mapRecs,
         counters.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter());
     
     int redRecs = mapRecs - redBadRecords.size();
-    assertEquals(redBadRecords.size(),
+    assertEquals("wrong REDUCE_SKIPPED_RECORDS value", redBadRecords.size(),
         counters.findCounter(TaskCounter.REDUCE_SKIPPED_RECORDS).getCounter());
-    assertEquals(redBadRecords.size(),
+    assertEquals("wrong REDUCE_SKIPPED_GROUPS value", redBadRecords.size(),
         counters.findCounter(TaskCounter.REDUCE_SKIPPED_GROUPS).getCounter());
-    assertEquals(redRecs,
+    assertEquals("wrong REDUCE_INPUT_GROUPS value", redRecs,
         counters.findCounter(TaskCounter.REDUCE_INPUT_GROUPS).getCounter());
-    assertEquals(redRecs,
+    assertEquals("wrong REDUCE_INPUT_RECORDS value", redRecs,
         counters.findCounter(TaskCounter.REDUCE_INPUT_RECORDS).getCounter());
-    assertEquals(redRecs,
+    assertEquals("wrong REDUCE_OUTPUT_RECORDS value", redRecs,
         counters.findCounter(TaskCounter.REDUCE_OUTPUT_RECORDS).getCounter());
     
     //validate skipped records



Mime
View raw message