hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r1130650 - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ mr-...
Date Thu, 02 Jun 2011 16:53:29 GMT
Author: sharad
Date: Thu Jun  2 16:53:28 2011
New Revision: 1130650

URL: http://svn.apache.org/viewvc?rev=1130650&view=rev
Log:
Fix container size rounding in AM and headroom in RM.

Modified:
    hadoop/mapreduce/branches/MR-279/CHANGES.txt
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AMConstants.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1130650&r1=1130649&r2=1130650&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Thu Jun  2 16:53:28 2011
@@ -4,6 +4,8 @@ Trunk (unreleased changes)
 
 
     MAPREDUCE-279
+    Fix container size rounding in AM and headroom in RM. (acmurthy and 
+    sharad) 
 
     More cleaning up constants, removing stale code, and making conspicuous
     the envs that apps depend on to be provided by YARN. (vinodkv)

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AMConstants.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AMConstants.java?rev=1130650&r1=1130649&r2=1130650&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AMConstants.java
(original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AMConstants.java
Thu Jun  2 16:53:28 2011
@@ -59,4 +59,12 @@ public interface AMConstants {
 
   public static final String RECOVERY_ENABLE = MRConstants.YARN_MR_PREFIX
       + "recovery.enable";
+  
+  public static final float DEFAULT_REDUCE_RAMP_UP_LIMIT = 0.5f;
+  public static final String REDUCE_RAMPUP_UP_LIMIT = MRConstants.YARN_MR_PREFIX
+  + "reduce.rampup.limit";
+  
+  public static final float DEFAULT_REDUCE_PREEMPTION_LIMIT = 0.5f;
+  public static final String REDUCE_PREEMPTION_LIMIT = MRConstants.YARN_MR_PREFIX
+  + "reduce.preemption.limit";
 }

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1130650&r1=1130649&r2=1130650&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
(original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
Thu Jun  2 16:53:28 2011
@@ -422,6 +422,10 @@ public abstract class TaskAttemptImpl im
   }
 
   private int getMemoryRequired(Configuration conf, TaskType taskType) {
+    //TODO: get this from RM instead of reading the config
+    int slotMemSize = conf.getInt("yarn.capacity-scheduler.minimum-allocation-mb",
+        1024);
+    LOG.info("slotMemSize " + slotMemSize);
     int memory = 1024;
     if (taskType == TaskType.MAP)  {
       memory = conf.getInt(MRJobConfig.MAP_MEMORY_MB, 1024);
@@ -429,6 +433,8 @@ public abstract class TaskAttemptImpl im
       memory = conf.getInt(MRJobConfig.REDUCE_MEMORY_MB, 1024);
     }
     
+    //round off on slotsize
+    memory = (int) Math.ceil((float) memory/slotMemSize) * slotMemSize;
     return memory;
   }
 

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1130650&r1=1130649&r2=1130650&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
(original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
Thu Jun  2 16:53:28 2011
@@ -40,6 +40,7 @@ import org.apache.hadoop.mapreduce.TypeC
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.AMConstants;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
@@ -63,12 +64,6 @@ public class RMContainerAllocator extend
   public static final 
   float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
   
-  public static final 
-  float DEFAULT_LIMIT_PERCENT_FOR_REDUCE_PREEMPTION = 0.2f;
-  
-  public static final 
-  float DEFAULT_LIMIT_REDUCE_RAMP_UP = 0.5f;
-  
   private static final Priority PRIORITY_FAST_FAIL_MAP;
   private static final Priority PRIORITY_REDUCE;
   private static final Priority PRIORITY_MAP;
@@ -110,9 +105,6 @@ public class RMContainerAllocator extend
   //holds scheduled requests to be fulfilled by RM
   private final ScheduledRequests scheduledRequests = new ScheduledRequests();
   
-  private int slotMemSize = 0;
-  private int completedMapsForReduceSlowstart;
-  
   private int containersAllocated = 0;
   private int containersReleased = 0;
   private int hostLocalAssigned = 0;
@@ -125,19 +117,29 @@ public class RMContainerAllocator extend
   private int reduceResourceReqt;//memory
   private int completedMaps = 0;
   private int completedReduces = 0;
+  
+  private boolean reduceStarted = false;
+  private float maxReduceRampupLimit = 0;
+  private float maxReducePreemptionLimit = 0;
+  private float reduceSlowStart = 0;
 
   public RMContainerAllocator(ClientService clientService, AppContext context) {
     super(clientService, context);
     this.context = context;
   }
-  
-  @Override 
+
+  @Override
   public void init(Configuration conf) {
     super.init(conf);
-    //TODO: this should be received as part of the registration from RM
-    //for now read from config
-    slotMemSize = conf.getInt("yarn.capacity-scheduler.minimum-allocation-mb",
-        1024);
+    reduceSlowStart = conf.getFloat(
+        MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 
+        DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART);
+    maxReduceRampupLimit = conf.getFloat(
+        AMConstants.REDUCE_RAMPUP_UP_LIMIT, 
+        AMConstants.DEFAULT_REDUCE_RAMP_UP_LIMIT);
+    maxReducePreemptionLimit = conf.getFloat(
+        AMConstants.REDUCE_PREEMPTION_LIMIT,
+        AMConstants.DEFAULT_REDUCE_PREEMPTION_LIMIT);
   }
 
   @Override 
@@ -181,14 +183,13 @@ public class RMContainerAllocator extend
       if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) {
         if (mapResourceReqt == 0) {
           mapResourceReqt = reqEvent.getCapability().getMemory();
-          //round off on slotSize
-          mapResourceReqt = (int) Math.ceil(mapResourceReqt/slotMemSize) * slotMemSize;
+          LOG.info("mapResourceReqt:"+mapResourceReqt);
         }
         scheduledRequests.addMap(reqEvent);//maps are immediately scheduled
       } else {
         if (reduceResourceReqt == 0) {
           reduceResourceReqt = reqEvent.getCapability().getMemory();
-          reduceResourceReqt = (int) Math.ceil(reduceResourceReqt/slotMemSize) * slotMemSize;
+          LOG.info("reduceResourceReqt:"+reduceResourceReqt);
         }
         if (reqEvent.getEarlierAttemptFailed()) {
           //add to the front of queue for fail fast
@@ -227,12 +228,18 @@ public class RMContainerAllocator extend
     //unassigned maps
     int memLimit = getMemLimit();
     if (scheduledRequests.maps.size() > 0) {
-      int availableMemForMap = memLimit - (assignedRequests.reduces.size() * reduceResourceReqt
-
-          assignedRequests.preemptionWaitingReduces.size() * reduceResourceReqt);
+      int availableMemForMap = memLimit - ((assignedRequests.reduces.size() -
+          assignedRequests.preemptionWaitingReduces.size()) * reduceResourceReqt);
       //availableMemForMap must be sufficient to run atleast 1 map
       if (availableMemForMap < mapResourceReqt) {
-        int premeptionLimit = (int) DEFAULT_LIMIT_PERCENT_FOR_REDUCE_PREEMPTION * memLimit
/reduceResourceReqt;
-        int toPreempt = Math.min(scheduledRequests.maps.size(), premeptionLimit);
+        //preempt for making space for atleast one map
+        int premeptionLimit = Math.max(mapResourceReqt - availableMemForMap, 
+            (int) maxReducePreemptionLimit * memLimit);
+        
+        int preemptMem = Math.min(scheduledRequests.maps.size() * mapResourceReqt, 
+            premeptionLimit);
+        
+        int toPreempt = (int) Math.ceil((float) preemptMem/reduceResourceReqt);
         toPreempt = Math.min(toPreempt, assignedRequests.reduces.size());
         
         LOG.info("Going to preempt " + toPreempt);
@@ -251,19 +258,18 @@ public class RMContainerAllocator extend
     
     int totalMaps = assignedRequests.maps.size() + completedMaps + scheduledRequests.maps.size();
     
-    if (completedMapsForReduceSlowstart == 0) {//not set yet
-      completedMapsForReduceSlowstart = 
-        (int)Math.ceil(
-            (getConfig().getFloat(
-                MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 
-                      DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) * 
-                      totalMaps));
-    }
-    
-    if(completedMaps < completedMapsForReduceSlowstart) {
-      LOG.info("Reduce slow start threshold not met. " +
-      		"completedMapsForReduceSlowstart " + completedMapsForReduceSlowstart);
-      return;
+    //check for slow start
+    if (!reduceStarted) {//not set yet
+      int completedMapsForReduceSlowstart = (int)Math.ceil(reduceSlowStart * 
+                      totalMaps);
+      if(completedMaps < completedMapsForReduceSlowstart) {
+        LOG.info("Reduce slow start threshold not met. " +
+              "completedMapsForReduceSlowstart " + completedMapsForReduceSlowstart);
+        return;
+      } else {
+        LOG.info("Reduce slow start threshold reached. Scheduling reduces.");
+        reduceStarted = true;
+      }
     }
     
     int completedMapPercent = 0;
@@ -281,7 +287,7 @@ public class RMContainerAllocator extend
     // ramp up the reduces based on completed map percentage
     int memLimit = getMemLimit();
     reduceMemLimit = Math.min(completedMapPercent * memLimit,
-        (int) DEFAULT_LIMIT_REDUCE_RAMP_UP * memLimit);
+        (int) maxReduceRampupLimit * memLimit);
     mapMemLimit = memLimit - reduceMemLimit;
 
     // check if there aren't enough maps scheduled, give the free map capacity

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1130650&r1=1130649&r2=1130650&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
(original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
Thu Jun  2 16:53:28 2011
@@ -511,7 +511,10 @@ public class LeafQueue implements Queue 
       application.showRequests();
 
       synchronized (application) {
-        //if (application.)
+        Resource userLimit = 
+          computeUserLimit(application, clusterResource, Resources.none());
+        setUserResourceLimit(application, userLimit);
+        
         for (Priority priority : application.getPriorities()) {
 
           // Do we need containers at this 'priority'?
@@ -522,19 +525,21 @@ public class LeafQueue implements Queue 
           // Are we going over limits by allocating to this application?
           ResourceRequest required = 
             application.getResourceRequest(priority, NodeManager.ANY);
-          if (required != null && required.getNumContainers() > 0) {
+          
 
-            // Maximum Capacity of the queue
-            if (!assignToQueue(clusterResource, required.getCapability())) {
-              return Resources.none();
-            }
-
-            // User limits
-            if (!assignToUser(application, clusterResource, required.getCapability())) {
-              continue; 
-            }
+          // Maximum Capacity of the queue
+          if (!assignToQueue(clusterResource, required.getCapability())) {
+            return Resources.none();
+          }
 
+          // User limits
+          userLimit = 
+            computeUserLimit(application, clusterResource, 
+                required.getCapability());
+          if (!assignToUser(application.getUser(), userLimit)) {
+            continue; 
           }
+
           // Inform the application it is about to get a scheduling opportunity
           application.addSchedulingOpportunity(priority);
           
@@ -619,7 +624,12 @@ public class LeafQueue implements Queue 
     return true;
   }
 
-  private synchronized boolean assignToUser(Application application, 
+  private void setUserResourceLimit(Application application, Resource resourceLimit) {
+    application.setAvailableResourceLimit(resourceLimit);
+    metrics.setAvailableResourcesToUser(application.getUser(), resourceLimit);
+  }
+  
+  private Resource computeUserLimit(Application application, 
       Resource clusterResource, Resource required) {
     // What is our current capacity? 
     // * It is equal to the max(required, queue-capacity) if
@@ -658,14 +668,10 @@ public class LeafQueue implements Queue 
               (int)(queueCapacity * userLimitFactor)
       );
 
-    application.setAvailableResourceLimit(Resources.createResource(limit));
-    metrics.setAvailableResourcesToUser(userName, application.getResourceLimit());
-
-    // Note: We aren't considering the current request since there is a fixed
-    // overhead of the AM, so... 
-    if ((user.getConsumedResources().getMemory()) > limit) {
-      LOG.info("User " + userName + " in queue " + getQueueName() + 
-          " will exceed limit, required: " + required + 
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("User limit computatation for " + userName + 
+          " in queue " + getQueueName() + 
+          " required: " + required + 
           " consumed: " + user.getConsumedResources() + " limit: " + limit +
           " queueCapacity: " + queueCapacity + 
           " qconsumed: " + consumed +
@@ -673,6 +679,25 @@ public class LeafQueue implements Queue 
           " activeUsers: " + activeUsers +
           " clusterCapacity: " + clusterResource.getMemory()
       );
+    }
+    
+    return Resources.createResource(limit);
+  }
+  
+  private synchronized boolean assignToUser(String userName, Resource limit) {
+
+    User user = getUser(userName);
+    
+    // Note: We aren't considering the current request since there is a fixed
+    // overhead of the AM, so... 
+    if ((user.getConsumedResources().getMemory()) > limit.getMemory()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("User " + userName + " in queue " + getQueueName() + 
+            " will exceed limit - " +  
+            " consumed: " + user.getConsumedResources() + 
+            " limit: " + limit
+        );
+      }
       return false;
     }
 



Mime
View raw message