Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 17118 invoked from network); 26 May 2009 13:35:38 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 26 May 2009 13:35:38 -0000 Received: (qmail 95220 invoked by uid 500); 26 May 2009 13:35:51 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 95172 invoked by uid 500); 26 May 2009 13:35:51 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 95163 invoked by uid 99); 26 May 2009 13:35:50 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 May 2009 13:35:50 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 May 2009 13:35:35 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 085202388897; Tue, 26 May 2009 13:35:14 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r778700 [1/2] - in /hadoop/core/branches/branch-0.20: ./ conf/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/mapred/ src/mapred/org/apache/hadoop/mapr... Date: Tue, 26 May 2009 13:35:13 -0000 To: core-commits@hadoop.apache.org From: yhemanth@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090526133514.085202388897@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: yhemanth Date: Tue May 26 13:35:12 2009 New Revision: 778700 URL: http://svn.apache.org/viewvc?rev=778700&view=rev Log: HADOOP-5881. Simplify memory monitoring and scheduling related configuration. Contributed by Vinod Kumar Vavilapalli. Added: hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestSubmitJob.java Modified: hadoop/core/branches/branch-0.20/CHANGES.txt hadoop/core/branches/branch-0.20/conf/capacity-scheduler.xml.template hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java hadoop/core/branches/branch-0.20/src/mapred/mapred-default.xml hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobConf.java hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskRunner.java hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestTTMemoryReporting.java hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java Modified: hadoop/core/branches/branch-0.20/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=778700&r1=778699&r2=778700&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/CHANGES.txt (original) +++ hadoop/core/branches/branch-0.20/CHANGES.txt Tue May 26 13:35:12 2009 @@ -7,6 +7,9 @@ HADOOP-5726. Remove pre-emption from capacity scheduler code base. (Rahul Kumar Singh via yhemanth) + HADOOP-5881. Simplify memory monitoring and scheduling related + configuration. (Vinod Kumar Vavilapalli via yhemanth) + NEW FEATURES IMPROVEMENTS Modified: hadoop/core/branches/branch-0.20/conf/capacity-scheduler.xml.template URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/conf/capacity-scheduler.xml.template?rev=778700&r1=778699&r2=778700&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/conf/capacity-scheduler.xml.template (original) +++ hadoop/core/branches/branch-0.20/conf/capacity-scheduler.xml.template Tue May 26 13:35:12 2009 @@ -56,34 +56,6 @@ account in scheduling decisions by default in a job queue. - - - mapred.capacity-scheduler.task.default-pmem-percentage-in-vmem - -1 - A percentage (float) of the default VM limit for jobs - (mapred.task.default.maxvm). This is the default RAM task-limit - associated with a task. Unless overridden by a job's setting, this - number defines the RAM task-limit. - - If this property is missing, or set to an invalid value, scheduling - based on physical memory, RAM, is disabled. - - - - - mapred.capacity-scheduler.task.limit.maxpmem - -1 - Configuration that provides an upper limit on the maximum - physical memory that can be specified by a job. The job configuration - mapred.task.maxpmem should be less than this value. If not, the job will - be rejected by the scheduler. - - If it is set to -1, scheduler will not consider physical memory for - scheduling even if virtual memory based scheduling is enabled(by setting - valid values for both mapred.task.default.maxvmem and - mapred.task.limit.maxvmem). - - mapred.capacity-scheduler.default-minimum-user-limit-percent Modified: hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=778700&r1=778699&r2=778700&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java (original) +++ hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java Tue May 26 13:35:12 2009 @@ -351,44 +351,4 @@ rmConf.setInt( "mapred.capacity-scheduler.init-worker-threads", poolSize); } - - /** - * Get the upper limit on the maximum physical memory that can be specified by - * a job. - * - * @return upper limit for max pmem for tasks. - */ - public long getLimitMaxPmemForTasks() { - return rmConf.getLong(UPPER_LIMIT_ON_TASK_PMEM_PROPERTY, - JobConf.DISABLED_MEMORY_LIMIT); - } - - /** - * Get the upper limit on the maximum physical memory that can be specified by - * a job. - * - * @param value - */ - public void setLimitMaxPmemForTasks(long value) { - rmConf.setLong(UPPER_LIMIT_ON_TASK_PMEM_PROPERTY, value); - } - - /** - * Get cluster-wide default percentage of pmem in vmem. - * - * @return cluster-wide default percentage of pmem in vmem. - */ - public float getDefaultPercentOfPmemInVmem() { - return rmConf.getFloat(DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY, - JobConf.DISABLED_MEMORY_LIMIT); - } - - /** - * Set cluster-wide default percentage of pmem in vmem. - * - * @param value - */ - public void setDefaultPercentOfPmemInVmem(float value) { - rmConf.setFloat(DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY, value); - } } Modified: hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=778700&r1=778699&r2=778700&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original) +++ hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Tue May 26 13:35:12 2009 @@ -278,11 +278,7 @@ /** our TaskScheduler object */ protected CapacityTaskScheduler scheduler; - // can be replaced with a global type, if we have one - protected static enum TYPE { - MAP, REDUCE - } - protected TYPE type = null; + protected CapacityTaskScheduler.TYPE type = null; abstract Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) throws IOException; @@ -413,7 +409,8 @@ //If this job meets memory requirements. Ask the JobInProgress for //a task to be scheduled on the task tracker. //if we find a job then we pass it on. - if (scheduler.memoryMatcher.matchesMemoryRequirements(j, taskTracker)) { + if (scheduler.memoryMatcher.matchesMemoryRequirements(j, type, + taskTracker)) { // We found a suitable job. Get task from it. Task t = obtainNewTask(taskTracker, j); //if there is a task return it immediately. @@ -422,6 +419,8 @@ return TaskLookupResult.getTaskFoundResult(t); } else { //skip to the next job in the queue. + LOG.debug("Job " + j.getJobID().toString() + + " returned no tasks of type " + type); continue; } } else { @@ -456,7 +455,8 @@ if (j.getStatus().getRunState() != JobStatus.RUNNING) { continue; } - if (scheduler.memoryMatcher.matchesMemoryRequirements(j, taskTracker)) { + if (scheduler.memoryMatcher.matchesMemoryRequirements(j, type, + taskTracker)) { // We found a suitable job. Get task from it. Task t = obtainNewTask(taskTracker, j); //if there is a task return it immediately. @@ -561,7 +561,7 @@ private static class MapSchedulingMgr extends TaskSchedulingMgr { MapSchedulingMgr(CapacityTaskScheduler dad) { super(dad); - type = TaskSchedulingMgr.TYPE.MAP; + type = CapacityTaskScheduler.TYPE.MAP; queueComparator = mapComparator; } Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) @@ -603,7 +603,7 @@ private static class ReduceSchedulingMgr extends TaskSchedulingMgr { ReduceSchedulingMgr(CapacityTaskScheduler dad) { super(dad); - type = TaskSchedulingMgr.TYPE.REDUCE; + type = CapacityTaskScheduler.TYPE.REDUCE; queueComparator = reduceComparator; } Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) @@ -664,13 +664,18 @@ return System.currentTimeMillis(); } } + // can be replaced with a global type, if we have one + protected static enum TYPE { + MAP, REDUCE + } + private Clock clock; private JobInitializationPoller initializationPoller; - long limitMaxVmemForTasks; - long limitMaxPmemForTasks; - long defaultMaxVmPerTask; - float defaultPercentOfPmemInVmem; + private long memSizeForMapSlotOnJT; + private long memSizeForReduceSlotOnJT; + private long limitMaxMemForMapTasks; + private long limitMaxMemForReduceTasks; public CapacityTaskScheduler() { this(new Clock()); @@ -687,37 +692,45 @@ this.schedConf = conf; } - /** - * Normalize the negative values in configuration - * - * @param val - * @return normalized value - */ - private long normalizeMemoryConfigValue(long val) { - if (val < 0) { - val = JobConf.DISABLED_MEMORY_LIMIT; - } - return val; - } - private void initializeMemoryRelatedConf() { - limitMaxVmemForTasks = - normalizeMemoryConfigValue(conf.getLong( - JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, + memSizeForMapSlotOnJT = + JobConf.normalizeMemoryConfigValue(conf.getLong( + JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, + JobConf.DISABLED_MEMORY_LIMIT)); + memSizeForReduceSlotOnJT = + JobConf.normalizeMemoryConfigValue(conf.getLong( + JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, JobConf.DISABLED_MEMORY_LIMIT)); + limitMaxMemForMapTasks = + JobConf.normalizeMemoryConfigValue(conf.getLong( + JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, + JobConf.DISABLED_MEMORY_LIMIT)); + limitMaxMemForReduceTasks = + JobConf.normalizeMemoryConfigValue(conf.getLong( + JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, + JobConf.DISABLED_MEMORY_LIMIT)); + LOG.info(new StringBuilder().append("Scheduler configured with ").append( + "(memSizeForMapSlotOnJT, memSizeForReduceSlotOnJT,").append( + " limitMaxMemForMapTasks, limitMaxMemForReduceTasks)").append( + memSizeForMapSlotOnJT).append(", ").append(memSizeForReduceSlotOnJT) + .append(", ").append(limitMaxMemForMapTasks).append(", ").append( + limitMaxMemForReduceTasks).append(")")); + } - limitMaxPmemForTasks = - normalizeMemoryConfigValue(schedConf.getLimitMaxPmemForTasks()); + long getMemSizeForMapSlot() { + return memSizeForMapSlotOnJT; + } - defaultMaxVmPerTask = - normalizeMemoryConfigValue(conf.getLong( - JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY, - JobConf.DISABLED_MEMORY_LIMIT)); + long getMemSizeForReduceSlot() { + return memSizeForReduceSlotOnJT; + } - defaultPercentOfPmemInVmem = schedConf.getDefaultPercentOfPmemInVmem(); - if (defaultPercentOfPmemInVmem < 0) { - defaultPercentOfPmemInVmem = JobConf.DISABLED_MEMORY_LIMIT; - } + long getLimitMaxMemForMapSlot() { + return limitMaxMemForMapTasks; + } + + long getLimitMaxMemForReduceSlot() { + return limitMaxMemForReduceTasks; } @Override @@ -955,14 +968,12 @@ // found a task; return return Collections.singletonList(tlr.getTask()); } - else if (TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT == - tlr.getLookUpStatus()) { - // return no task - return null; - } // if we didn't get any, look at map tasks, if TT has space - else if ((TaskLookupResult.LookUpStatus.NO_TASK_FOUND == - tlr.getLookUpStatus()) && (maxMapTasks > currentMapTasks)) { + else if ((TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT + == tlr.getLookUpStatus() || + TaskLookupResult.LookUpStatus.NO_TASK_FOUND + == tlr.getLookUpStatus()) + && (maxMapTasks > currentMapTasks)) { mapScheduler.updateCollectionOfQSIs(); tlr = mapScheduler.assignTasks(taskTracker); if (TaskLookupResult.LookUpStatus.TASK_FOUND == @@ -980,13 +991,12 @@ // found a task; return return Collections.singletonList(tlr.getTask()); } - else if (TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT == - tlr.getLookUpStatus()) { - return null; - } // if we didn't get any, look at reduce tasks, if TT has space - else if ((TaskLookupResult.LookUpStatus.NO_TASK_FOUND == - tlr.getLookUpStatus()) && (maxReduceTasks > currentReduceTasks)) { + else if ((TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT + == tlr.getLookUpStatus() + || TaskLookupResult.LookUpStatus.NO_TASK_FOUND + == tlr.getLookUpStatus()) + && (maxReduceTasks > currentReduceTasks)) { reduceScheduler.updateCollectionOfQSIs(); tlr = reduceScheduler.assignTasks(taskTracker); if (TaskLookupResult.LookUpStatus.TASK_FOUND == @@ -999,38 +1009,6 @@ return null; } - /** - * Kill the job if it has invalid requirements and return why it is killed - * - * @param job - * @return string mentioning why the job is killed. Null if the job has valid - * requirements. - */ - private String killJobIfInvalidRequirements(JobInProgress job) { - if (!memoryMatcher.isSchedulingBasedOnVmemEnabled()) { - return null; - } - if ((job.getMaxVirtualMemoryForTask() > limitMaxVmemForTasks) - || (memoryMatcher.isSchedulingBasedOnPmemEnabled() && (job - .getMaxPhysicalMemoryForTask() > limitMaxPmemForTasks))) { - String msg = - job.getJobID() + " (" + job.getMaxVirtualMemoryForTask() + "vmem, " - + job.getMaxPhysicalMemoryForTask() - + "pmem) exceeds the cluster's max-memory-limits (" - + limitMaxVmemForTasks + "vmem, " + limitMaxPmemForTasks - + "pmem). Cannot run in this cluster, so killing it."; - LOG.warn(msg); - try { - taskTrackerManager.killJob(job.getJobID()); - return msg; - } catch (IOException ioe) { - LOG.warn("Failed to kill the job " + job.getJobID() + ". Reason : " - + StringUtils.stringifyException(ioe)); - } - } - return null; - } - // called when a job is added synchronized void jobAdded(JobInProgress job) throws IOException { QueueSchedulingInfo qsi = @@ -1050,13 +1028,6 @@ qsi.numJobsByUser.put(job.getProfile().getUser(), i); LOG.debug("Job " + job.getJobID().toString() + " is added under user " + job.getProfile().getUser() + ", user now has " + i + " jobs"); - - // Kill the job if it cannot run in the cluster because of invalid - // resource requirements. - String statusMsg = killJobIfInvalidRequirements(job); - if (statusMsg != null) { - throw new IOException(statusMsg); - } } // called when a job completes Modified: hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java?rev=778700&r1=778699&r2=778700&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java (original) +++ hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java Tue May 26 13:35:12 2009 @@ -30,111 +30,33 @@ this.scheduler = capacityTaskScheduler; } - boolean isSchedulingBasedOnVmemEnabled() { - LOG.debug("defaultMaxVmPerTask : " + scheduler.defaultMaxVmPerTask - + " limitMaxVmemForTasks : " + scheduler.limitMaxVmemForTasks); - if (scheduler.defaultMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT - || scheduler.limitMaxVmemForTasks == JobConf.DISABLED_MEMORY_LIMIT) { + boolean isSchedulingBasedOnMemEnabled() { + if (scheduler.getLimitMaxMemForMapSlot() + == JobConf.DISABLED_MEMORY_LIMIT + || scheduler.getLimitMaxMemForReduceSlot() + == JobConf.DISABLED_MEMORY_LIMIT + || scheduler.getMemSizeForMapSlot() + == JobConf.DISABLED_MEMORY_LIMIT + || scheduler.getMemSizeForReduceSlot() + == JobConf.DISABLED_MEMORY_LIMIT) { return false; } return true; } - boolean isSchedulingBasedOnPmemEnabled() { - LOG.debug("defaultPercentOfPmemInVmem : " - + scheduler.defaultPercentOfPmemInVmem + " limitMaxPmemForTasks : " - + scheduler.limitMaxPmemForTasks); - if (scheduler.defaultPercentOfPmemInVmem == JobConf.DISABLED_MEMORY_LIMIT - || scheduler.limitMaxPmemForTasks == JobConf.DISABLED_MEMORY_LIMIT) { - return false; - } - return true; - } - - /** - * Obtain the virtual memory allocated for a job's tasks. - * - * If the job has a configured value for the max-virtual memory, that will be - * returned. Else, the cluster-wide default max-virtual memory for tasks is - * returned. - * - * This method can only be called after - * {@link CapacityTaskScheduler#initializeMemoryRelatedConf()} is invoked. - * - * @param jConf JobConf of the job - * @return the virtual memory allocated for the job's tasks. - */ - private long getVirtualMemoryForTask(JobConf jConf) { - long vMemForTask = jConf.getMaxVirtualMemoryForTask(); - if (vMemForTask == JobConf.DISABLED_MEMORY_LIMIT) { - vMemForTask = - new JobConf().getLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY, - scheduler.defaultMaxVmPerTask); - } - return vMemForTask; - } - - /** - * Obtain the physical memory allocated for a job's tasks. - * - * If the job has a configured value for the max physical memory, that - * will be returned. Else, the cluster-wide default physical memory for - * tasks is returned. - * - * This method can only be called after - * {@link CapacityTaskScheduler#initializeMemoryRelatedConf()} is invoked. - * - * @param jConf JobConf of the job - * @return the physical memory allocated for the job's tasks - */ - private long getPhysicalMemoryForTask(JobConf jConf) { - long pMemForTask = jConf.getMaxPhysicalMemoryForTask(); - if (pMemForTask == JobConf.DISABLED_MEMORY_LIMIT) { - pMemForTask = - Math.round(getVirtualMemoryForTask(jConf) - * scheduler.defaultPercentOfPmemInVmem); - } - return pMemForTask; - } - - static class Memory { - long vmem; - long pmem; - - Memory(long vm, long pm) { - this.vmem = vm; - this.pmem = pm; - } - } - /** * Find the memory that is already used by all the running tasks * residing on the given TaskTracker. * * @param taskTracker + * @param taskType * @return amount of memory that is used by the residing tasks, * null if memory cannot be computed for some reason. */ - private synchronized Memory getMemReservedForTasks( - TaskTrackerStatus taskTracker) { - boolean disabledVmem = false; - boolean disabledPmem = false; - - if (scheduler.defaultMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT) { - disabledVmem = true; - } - - if (scheduler.defaultPercentOfPmemInVmem == JobConf.DISABLED_MEMORY_LIMIT) { - disabledPmem = true; - } - - if (disabledVmem && disabledPmem) { - return new Memory(JobConf.DISABLED_MEMORY_LIMIT, - JobConf.DISABLED_MEMORY_LIMIT); - } - + private synchronized Long getMemReservedForTasks( + TaskTrackerStatus taskTracker, CapacityTaskScheduler.TYPE taskType) { long vmem = 0; - long pmem = 0; + long myVmem = 0; for (TaskStatus task : taskTracker.getTaskReports()) { // the following task states are one in which the slot is @@ -142,12 +64,12 @@ // accounted in used memory. if ((task.getRunState() == TaskStatus.State.RUNNING) || (task.getRunState() == TaskStatus.State.COMMIT_PENDING)) { - JobInProgress job = scheduler.taskTrackerManager.getJob( - task.getTaskID().getJobID()); + JobInProgress job = + scheduler.taskTrackerManager.getJob(task.getTaskID().getJobID()); if (job == null) { // This scenario can happen if a job was completed/killed - // and retired from JT's memory. In this state, we can ignore - // the running task status and compute memory for the rest of + // and retired from JT's memory. In this state, we can ignore + // the running task status and compute memory for the rest of // the tasks. However, any scheduling done with this computation // could result in over-subscribing of memory for tasks on this // TT (as the unaccounted for task is still running). @@ -155,123 +77,98 @@ // One of the ways of doing that is to return null from here // and check for null in the calling method. LOG.info("Task tracker: " + taskTracker.getHost() + " is reporting " - + "a running / commit pending task: " + task.getTaskID() - + " but no corresponding job was found. " - + "Maybe job was retired. Not computing " - + "memory values for this TT."); + + "a running / commit pending task: " + task.getTaskID() + + " but no corresponding job was found. " + + "Maybe job was retired. Not computing " + + "memory values for this TT."); return null; } - - JobConf jConf = - scheduler.taskTrackerManager.getJob(task.getTaskID().getJobID()) - .getJobConf(); - if (!disabledVmem) { - vmem += getVirtualMemoryForTask(jConf); - } - if (!disabledPmem) { - pmem += getPhysicalMemoryForTask(jConf); + + JobConf jConf = job.getJobConf(); + + // Get the memory "allotted" for this task by rounding off the job's + // tasks' memory limits to the nearest multiple of the slot-memory-size + // set on JT. This essentially translates to tasks of a high memory job + // using multiple slots. + if (task.getIsMap() && taskType.equals(CapacityTaskScheduler.TYPE.MAP)) { + myVmem = jConf.getMemoryForMapTask(); + myVmem = + (long) (scheduler.getMemSizeForMapSlot() * Math + .ceil((float) myVmem + / (float) scheduler.getMemSizeForMapSlot())); + } else if (!task.getIsMap() + && taskType.equals(CapacityTaskScheduler.TYPE.REDUCE)) { + myVmem = jConf.getMemoryForReduceTask(); + myVmem = + (long) (scheduler.getMemSizeForReduceSlot() * Math + .ceil((float) myVmem + / (float) scheduler.getMemSizeForReduceSlot())); } + vmem += myVmem; } } - return new Memory(vmem, pmem); + return Long.valueOf(vmem); } /** - * Check if a TT has enough pmem and vmem to run this job. + * Check if a TT has enough memory to run of task specified from this job. * @param job + * @param taskType * @param taskTracker * @return true if this TT has enough memory for this job. False otherwise. */ boolean matchesMemoryRequirements(JobInProgress job, - TaskTrackerStatus taskTracker) { + CapacityTaskScheduler.TYPE taskType, TaskTrackerStatus taskTracker) { - // ////////////// vmem based scheduling - if (!isSchedulingBasedOnVmemEnabled()) { - LOG.debug("One of the configuration parameters defaultMaxVmPerTask " - + "and limitMaxVmemPerTasks is not configured. Scheduling based " - + "on job's memory requirements is disabled, ignoring any value " - + "set by job."); - return true; - } - - TaskTrackerStatus.ResourceStatus resourceStatus = - taskTracker.getResourceStatus(); - long totalVMemOnTT = resourceStatus.getTotalVirtualMemory(); - long reservedVMemOnTT = resourceStatus.getReservedTotalMemory(); - - if (totalVMemOnTT == JobConf.DISABLED_MEMORY_LIMIT - || reservedVMemOnTT == JobConf.DISABLED_MEMORY_LIMIT) { - return true; - } + LOG.debug("Matching memory requirements of " + job.getJobID().toString() + + " for scheduling on " + taskTracker.trackerName); - if (reservedVMemOnTT > totalVMemOnTT) { + if (!isSchedulingBasedOnMemEnabled()) { + LOG.debug("Scheduling based on job's memory requirements is disabled." + + " Ignoring any value set by job."); return true; } - long jobVMemForTask = job.getMaxVirtualMemoryForTask(); - if (jobVMemForTask == JobConf.DISABLED_MEMORY_LIMIT) { - jobVMemForTask = scheduler.defaultMaxVmPerTask; - } - - Memory memReservedForTasks = getMemReservedForTasks(taskTracker); - if (memReservedForTasks == null) { + Long memUsedOnTT = getMemReservedForTasks(taskTracker, taskType); + if (memUsedOnTT == null) { // For some reason, maybe because we could not find the job // corresponding to a running task (as can happen if the job // is retired in between), we could not compute the memory state // on this TT. Treat this as an error, and fail memory // requirements. - LOG.info("Could not compute memory for taskTracker: " - + taskTracker.getHost() + ". Failing memory requirements."); + LOG.info("Could not compute memory for taskTracker: " + + taskTracker.getHost() + ". Failing memory requirements."); return false; } - long vmemUsedOnTT = memReservedForTasks.vmem; - long pmemUsedOnTT = memReservedForTasks.pmem; - long freeVmemUsedOnTT = totalVMemOnTT - vmemUsedOnTT - reservedVMemOnTT; + long totalMemUsableOnTT = 0; - if (jobVMemForTask > freeVmemUsedOnTT) { + long memForThisTask = 0; + if (taskType.equals(CapacityTaskScheduler.TYPE.MAP)) { + memForThisTask = job.getJobConf().getMemoryForMapTask(); + totalMemUsableOnTT = + scheduler.getMemSizeForMapSlot() * taskTracker.getMaxMapTasks(); + } else if (taskType.equals(CapacityTaskScheduler.TYPE.REDUCE)) { + memForThisTask = job.getJobConf().getMemoryForReduceTask(); + totalMemUsableOnTT = + scheduler.getMemSizeForReduceSlot() + * taskTracker.getMaxReduceTasks(); + } + + long freeMemOnTT = totalMemUsableOnTT - memUsedOnTT.longValue(); + if (memForThisTask > freeMemOnTT) { + LOG.debug("memForThisTask (" + memForThisTask + ") > freeMemOnTT (" + + freeMemOnTT + "). A " + taskType + " task from " + + job.getJobID().toString() + " cannot be scheduled on TT " + + taskTracker.trackerName); return false; } - // ////////////// pmem based scheduling - - long totalPmemOnTT = resourceStatus.getTotalPhysicalMemory(); - long reservedPmemOnTT = resourceStatus.getReservedPhysicalMemory(); - long jobPMemForTask = job.getMaxPhysicalMemoryForTask(); - long freePmemUsedOnTT = 0; - - if (isSchedulingBasedOnPmemEnabled()) { - if (totalPmemOnTT == JobConf.DISABLED_MEMORY_LIMIT - || reservedPmemOnTT == JobConf.DISABLED_MEMORY_LIMIT) { - return true; - } - - if (reservedPmemOnTT > totalPmemOnTT) { - return true; - } - - if (jobPMemForTask == JobConf.DISABLED_MEMORY_LIMIT) { - jobPMemForTask = - Math.round(jobVMemForTask * scheduler.defaultPercentOfPmemInVmem); - } - - freePmemUsedOnTT = totalPmemOnTT - pmemUsedOnTT - reservedPmemOnTT; - - if (jobPMemForTask > freePmemUsedOnTT) { - return false; - } - } else { - LOG.debug("One of the configuration parameters " - + "defaultPercentOfPmemInVmem and limitMaxPmemPerTasks is not " - + "configured. Scheduling based on job's physical memory " - + "requirements is disabled, ignoring any value set by job."); - } - - LOG.debug("freeVMemOnTT = " + freeVmemUsedOnTT + " totalVMemOnTT = " - + totalVMemOnTT + " freePMemOnTT = " + freePmemUsedOnTT - + " totalPMemOnTT = " + totalPmemOnTT + " jobVMemForTask = " - + jobVMemForTask + " jobPMemForTask = " + jobPMemForTask); + LOG.debug("memForThisTask = " + memForThisTask + ". freeMemOnTT = " + + freeMemOnTT + ". A " + taskType.toString() + " task from " + + job.getJobID().toString() + " matches memory requirements on TT " + + taskTracker.trackerName); return true; } } Modified: hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=778700&r1=778699&r2=778700&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original) +++ hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Tue May 26 13:35:12 2009 @@ -182,8 +182,6 @@ } mapTaskCtr = 0; redTaskCtr = 0; - super.setMaxVirtualMemoryForTask(jobConf.getMaxVirtualMemoryForTask()); - super.setMaxPhysicalMemoryForTask(jobConf.getMaxPhysicalMemoryForTask()); } @Override @@ -232,7 +230,7 @@ } return task; } - + @Override public Task obtainNewReduceTask(final TaskTrackerStatus tts, int clusterSize, int ignored) throws IOException { @@ -727,7 +725,7 @@ private FakeJobInProgress submitJob(int state, JobConf jobConf) throws IOException { FakeJobInProgress job = new FakeJobInProgress(new JobID("test", ++jobCounter), - (jobConf == null ? new JobConf() : jobConf), taskTrackerManager, + (jobConf == null ? new JobConf(conf) : jobConf), taskTrackerManager, jobConf.getUser()); job.getStatus().setRunState(state); taskTrackerManager.submitJob(job); @@ -1498,12 +1496,6 @@ LOG.debug("Starting the scheduler."); taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1); - // Limited TT - 1GB vmem and 512MB pmem - taskTrackerManager.getTaskTracker("tt1").getResourceStatus() - .setTotalVirtualMemory(1 * 1024 * 1024 * 1024L); - taskTrackerManager.getTaskTracker("tt1").getResourceStatus() - .setTotalPhysicalMemory(512 * 1024 * 1024L); - taskTrackerManager.addQueues(new String[] { "default" }); ArrayList queues = new ArrayList(); queues.add(new FakeQueueInfo("default", 100.0f, true, 25)); @@ -1513,11 +1505,11 @@ // memory-based scheduling disabled by default. scheduler.start(); - LOG.debug("Submit one high memory(3GB vmem, 1GBpmem) job of 1 map task " - + "and 1 reduce task."); + LOG.debug("Submit one high memory job of 1 3GB map task " + + "and 1 1GB reduce task."); JobConf jConf = new JobConf(); - jConf.setMaxVirtualMemoryForTask(3 * 1024 * 1024 * 1024L); // 3GB vmem - jConf.setMaxPhysicalMemoryForTask(1 * 1024 * 1024 * 1024L); // 1 GB pmem + jConf.setMemoryForMapTask(3 * 1024L); // 3GB + jConf.setMemoryForReduceTask(1 * 1024L); // 1 GB jConf.setNumMapTasks(1); jConf.setNumReduceTasks(1); jConf.setQueueName("default"); @@ -1532,197 +1524,57 @@ } /** - * Test to verify that highPmemJobs are scheduled like all other jobs when - * physical-memory based scheduling is not enabled. - * @throws IOException - */ - public void testDisabledPmemBasedScheduling() - throws IOException { - - LOG.debug("Starting the scheduler."); - taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1); - - // Limited TT - 100GB vmem and 500MB pmem - TaskTrackerStatus.ResourceStatus ttStatus = - taskTrackerManager.getTaskTracker("tt1").getResourceStatus(); - ttStatus.setTotalVirtualMemory(100 * 1024 * 1024 * 1024L); - ttStatus.setReservedVirtualMemory(0); - ttStatus.setTotalPhysicalMemory(500 * 1024 * 1024L); - ttStatus.setReservedPhysicalMemory(0); - - taskTrackerManager.addQueues(new String[] { "default" }); - ArrayList queues = new ArrayList(); - queues.add(new FakeQueueInfo("default", 100.0f, true, 25)); - resConf.setFakeQueues(queues); - scheduler.setResourceManagerConf(resConf); - scheduler.setTaskTrackerManager(taskTrackerManager); - // enable vmem-based scheduling. pmem based scheduling disabled by default. - scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY, - 1536 * 1024 * 1024L); - scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, - 3 * 1024 * 1024 * 1024L); - scheduler.start(); - - LOG.debug("Submit one high pmem(3GB vmem, 1GBpmem) job of 1 map task " - + "and 1 reduce task."); - JobConf jConf = new JobConf(); - jConf.setMaxVirtualMemoryForTask(3 * 1024 * 1024 * 1024L); // 3GB vmem - jConf.setMaxPhysicalMemoryForTask(1 * 1024 * 1024 * 1024L); // 1 GB pmem - jConf.setNumMapTasks(1); - jConf.setNumReduceTasks(1); - jConf.setQueueName("default"); - jConf.setUser("u1"); - submitJobAndInit(JobStatus.RUNNING, jConf); - - // assert that all tasks are launched even though they transgress the - // scheduling limits. - - checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); - checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1"); - } - - /** - * Test HighMemoryJobs. - * @throws IOException - */ - public void testHighMemoryJobs() - throws IOException { - - LOG.debug("Starting the scheduler."); - taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1); - - TaskTrackerStatus.ResourceStatus ttStatus = - taskTrackerManager.getTaskTracker("tt1").getResourceStatus(); - ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L); - ttStatus.setReservedVirtualMemory(0); - ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024L); - ttStatus.setReservedPhysicalMemory(0); - // Normal job on this TT would be 1.5GB vmem, 0.5GB pmem - - taskTrackerManager.addQueues(new String[] { "default" }); - ArrayList queues = new ArrayList(); - queues.add(new FakeQueueInfo("default", 100.0f, true, 25)); - resConf.setFakeQueues(queues); - scheduler.setTaskTrackerManager(taskTrackerManager); - // enabled memory-based scheduling - scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY, - 1536 * 1024 * 1024L); - scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, - 3 * 1024 * 1024 * 1024L); - resConf.setDefaultPercentOfPmemInVmem(33.3f); - resConf.setLimitMaxPmemForTasks(1 * 1024 * 1024 * 1024L); - scheduler.setResourceManagerConf(resConf); - scheduler.start(); - - LOG.debug("Submit one high memory(1600MB vmem, 400MB pmem) job of " - + "1 map task and 1 reduce task."); - JobConf jConf = new JobConf(); - jConf.setMaxVirtualMemoryForTask(1600 * 1024 * 1024L); // 1.6GB vmem - jConf.setMaxPhysicalMemoryForTask(400 * 1024 * 1024L); // 400MB pmem - jConf.setNumMapTasks(1); - jConf.setNumReduceTasks(1); - jConf.setQueueName("default"); - jConf.setUser("u1"); - jConf.setMapSpeculativeExecution(false); - jConf.setReduceSpeculativeExecution(false); - FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf); - checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); - - // No more tasks of this job can run on the TT because of lack of vmem - assertNull(scheduler.assignTasks(tracker("tt1"))); - - // Let attempt_test_0001_m_000001_0 finish, task assignment should succeed. - taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", job1); - checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1"); - - LOG.debug("Submit another high memory(1200MB vmem, 800MB pmem) job of " - + "1 map task and 0 reduces."); - jConf.setMaxVirtualMemoryForTask(1200 * 1024 * 1024L); - jConf.setMaxPhysicalMemoryForTask(800 * 1024 * 1024L); - jConf.setNumMapTasks(1); - jConf.setNumReduceTasks(0); - jConf.setQueueName("default"); - jConf.setUser("u1"); - jConf.setMapSpeculativeExecution(false); - jConf.setReduceSpeculativeExecution(false); - submitJobAndInit(JobStatus.PREP, jConf); // job2 - - // This job shouldn't run the TT now because of lack of pmem - assertNull(scheduler.assignTasks(tracker("tt1"))); - - // Let attempt_test_0001_m_000002_0 finish, task assignment should succeed. - taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000001_0", job1); - checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1"); - - LOG.debug("Submit a normal memory(200MB vmem, 100MB pmem) job of " - + "0 maps and 1 reduce task."); - jConf.setMaxVirtualMemoryForTask(200 * 1024 * 1024L); - jConf.setMaxPhysicalMemoryForTask(100 * 1024 * 1024L); - jConf.setNumMapTasks(0); - jConf.setNumReduceTasks(1); - jConf.setQueueName("default"); - jConf.setUser("u1"); - submitJobAndInit(JobStatus.PREP, jConf); // job3 - - checkAssignment("tt1", "attempt_test_0003_r_000001_0 on tt1"); - } - - /** - * Test HADOOP-4979. - * Bug fix for making sure we always return null to TT if there is a - * high-mem job, and not look at reduce jobs (if map tasks are high-mem) - * or vice-versa. + * Test reverting HADOOP-4979. If there is a high-mem job, we should now look + * at reduce jobs (if map tasks are high-mem) or vice-versa. + * * @throws IOException */ - public void testHighMemoryBlocking() + public void testHighMemoryBlockingAcrossTaskTypes() throws IOException { // 2 map and 1 reduce slots taskTrackerManager = new FakeTaskTrackerManager(1, 2, 1); - TaskTrackerStatus.ResourceStatus ttStatus = - taskTrackerManager.getTaskTracker("tt1").getResourceStatus(); - ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L); - ttStatus.setReservedVirtualMemory(0); - ttStatus.setTotalPhysicalMemory(1536 * 1024 * 1024L); - ttStatus.setReservedPhysicalMemory(0); - // Normal job on this TT would be 1GB vmem, 0.5GB pmem - taskTrackerManager.addQueues(new String[] { "default" }); ArrayList queues = new ArrayList(); queues.add(new FakeQueueInfo("default", 100.0f, true, 25)); resConf.setFakeQueues(queues); scheduler.setTaskTrackerManager(taskTrackerManager); // enabled memory-based scheduling - scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY, - 1 * 1024 * 1024 * 1024L); - scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, - 3 * 1024 * 1024 * 1024L); - resConf.setDefaultPercentOfPmemInVmem(33.3f); - resConf.setLimitMaxPmemForTasks(1536 * 1024 * 1024L); + // Normal job in the cluster would be 1GB maps/reduces + scheduler.getConf().setLong( + JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, + 2 * 1024); + scheduler.getConf().setLong( + JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024); + scheduler.getConf().setLong( + JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, + 1 * 1024); + scheduler.getConf().setLong( + JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024); scheduler.setResourceManagerConf(resConf); scheduler.start(); - // We need a situation where the scheduler needs to run a map task, - // but the available one has a high-mem requirement. There should - // be another job whose maps or reduces can run, but they shouldn't - // be scheduled. + // The situation : Two jobs in the queue. First job with only maps and no + // reduces and is a high memory job. Second job is a normal job with both maps and reduces. + // First job cannot run for want of memory for maps. In this case, second job's reduces should run. - LOG.debug("Submit one high memory(2GB vmem, 400MB pmem) job of " + LOG.debug("Submit one high memory(2GB maps, 0MB reduces) job of " + "2 map tasks"); - JobConf jConf = new JobConf(); - jConf.setMaxVirtualMemoryForTask(2 * 1024 * 1024 * 1024L); // 2GB vmem - jConf.setMaxPhysicalMemoryForTask(400 * 1024 * 1024L); // 400MB pmem + JobConf jConf = new JobConf(conf); + jConf.setMemoryForMapTask(2 * 1024); + jConf.setMemoryForReduceTask(0); jConf.setNumMapTasks(2); jConf.setNumReduceTasks(0); jConf.setQueueName("default"); jConf.setUser("u1"); FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf); - LOG.debug("Submit another regular memory(900MB vmem, 200MB pmem) job of " + + LOG.debug("Submit another regular memory(1GB vmem maps/reduces) job of " + "2 map/red tasks"); - jConf = new JobConf(); - jConf.setMaxVirtualMemoryForTask(900 * 1024 * 1024L); // 900MB vmem - jConf.setMaxPhysicalMemoryForTask(200 * 1024 * 1024L); // 200MB pmem + jConf = new JobConf(conf); + jConf.setMemoryForMapTask(1 * 1024); + jConf.setMemoryForReduceTask(1 * 1024); jConf.setNumMapTasks(2); jConf.setNumReduceTasks(2); jConf.setQueueName("default"); @@ -1732,75 +1584,8 @@ // first, a map from j1 will run checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); // at this point, the scheduler tries to schedule another map from j1. - // there isn't enough space. There is space to run the second job's - // map or reduce task, but they shouldn't be scheduled - assertNull(scheduler.assignTasks(tracker("tt1"))); - } - - /** - * test invalid highMemoryJobs - * @throws IOException - */ - public void testHighMemoryJobWithInvalidRequirements() - throws IOException { - LOG.debug("Starting the scheduler."); - taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1); - TaskTrackerStatus.ResourceStatus ttStatus = - taskTrackerManager.getTaskTracker("tt1").getResourceStatus(); - ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024); - ttStatus.setReservedVirtualMemory(0); - ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024); - ttStatus.setReservedPhysicalMemory(0); - - ArrayList queues = new ArrayList(); - queues.add(new FakeQueueInfo("default", 100.0f, true, 25)); - taskTrackerManager.addQueues(new String[] { "default" }); - resConf.setFakeQueues(queues); - scheduler.setTaskTrackerManager(taskTrackerManager); - // enabled memory-based scheduling - long vmemUpperLimit = 1 * 1024 * 1024 * 1024L; - long vmemDefault = 1536 * 1024 * 1024L; - long pmemUpperLimit = vmemUpperLimit; - scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY, - vmemDefault); - scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, - vmemUpperLimit); - resConf.setDefaultPercentOfPmemInVmem(33.3f); - resConf.setLimitMaxPmemForTasks(pmemUpperLimit); - scheduler.setResourceManagerConf(resConf); - scheduler.start(); - - LOG.debug("Submit one invalid high ram(5GB vmem, 3GB pmem) job of " - + "1 map, 0 reduce tasks."); - long jobMaxVmem = 5 * 1024 * 1024 * 1024L; - long jobMaxPmem = 3 * 1024 * 1024 * 1024L; - JobConf jConf = new JobConf(); - jConf.setMaxVirtualMemoryForTask(jobMaxVmem); - jConf.setMaxPhysicalMemoryForTask(jobMaxPmem); - jConf.setNumMapTasks(1); - jConf.setNumReduceTasks(0); - jConf.setQueueName("default"); - jConf.setUser("u1"); - - boolean throwsException = false; - String msg = null; - FakeJobInProgress job; - try { - job = submitJob(JobStatus.PREP, jConf); - } catch (IOException ioe) { - // job has to fail - throwsException = true; - msg = ioe.getMessage(); - } - - assertTrue(throwsException); - job = (FakeJobInProgress) taskTrackerManager.getJobs().toArray()[0]; - assertTrue(msg.matches(job.getJobID() + " \\(" + jobMaxVmem + "vmem, " - + jobMaxPmem + "pmem\\) exceeds the cluster's max-memory-limits \\(" - + vmemUpperLimit + "vmem, " + pmemUpperLimit - + "pmem\\). Cannot run in this cluster, so killing it.")); - // For job, no cleanup task needed so gets killed immediately. - assertTrue(job.getStatus().getRunState() == JobStatus.KILLED); + // there isn't enough space. The second job's reduce should be scheduled. + checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1"); } /** @@ -1811,13 +1596,7 @@ throws IOException { LOG.debug("Starting the scheduler."); - taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1); - TaskTrackerStatus.ResourceStatus ttStatus = - taskTrackerManager.getTaskTracker("tt1").getResourceStatus(); - ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L); - ttStatus.setReservedVirtualMemory(0); - ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024L); - ttStatus.setReservedPhysicalMemory(0); + taskTrackerManager = new FakeTaskTrackerManager(2, 2, 2); ArrayList queues = new ArrayList(); queues.add(new FakeQueueInfo("default", 100.0f, true, 25)); @@ -1825,68 +1604,65 @@ resConf.setFakeQueues(queues); scheduler.setTaskTrackerManager(taskTrackerManager); // enabled memory-based scheduling - scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY, - 1536 * 1024 * 1024L); - scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, - 4 * 1024 * 1024 * 1024L); - resConf.setDefaultPercentOfPmemInVmem(33.3f); - resConf.setLimitMaxPmemForTasks(2 * 1024 * 1024 * 1024L); + // Normal jobs 1GB maps/reduces. 2GB limit on maps/reduces + scheduler.getConf().setLong( + JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, + 2 * 1024); + scheduler.getConf().setLong( + JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024); + scheduler.getConf().setLong( + JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, + 2 * 1024); + scheduler.getConf().setLong( + JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024); scheduler.setResourceManagerConf(resConf); scheduler.start(); - LOG.debug("Submit one high memory(4GB vmem, 512MB pmem) job of " + LOG.debug("Submit one normal memory(1GB maps/reduces) job of " + "1 map, 0 reduce tasks."); - JobConf jConf = new JobConf(); - jConf.setMaxVirtualMemoryForTask(4 * 1024 * 1024 * 1024L); - jConf.setMaxPhysicalMemoryForTask(512 * 1024 * 1024L); + JobConf jConf = new JobConf(conf); + jConf.setMemoryForMapTask(1 * 1024); + jConf.setMemoryForReduceTask(1 * 1024); jConf.setNumMapTasks(1); - jConf.setNumReduceTasks(0); + jConf.setNumReduceTasks(1); jConf.setQueueName("default"); jConf.setUser("u1"); FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf); - // TTs should not run these jobs i.e. cluster blocked because of lack of - // vmem - assertNull(scheduler.assignTasks(tracker("tt1"))); - assertNull(scheduler.assignTasks(tracker("tt1"))); - // Job should still be alive - assertTrue(job1.getStatus().getRunState() == JobStatus.RUNNING); - - LOG.debug("Submit a normal job of 1 map, 0 reduce tasks."); - // Use cluster-wide defaults - jConf.setMaxVirtualMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT); - jConf.setMaxPhysicalMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT); + // Fill the second tt with this job. + checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2"); + checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2"); + + LOG.debug("Submit one high memory(2GB maps/reduces) job of " + + "2 map, 2 reduce tasks."); + jConf = new JobConf(conf); + jConf.setMemoryForMapTask(2 * 1024); + jConf.setMemoryForReduceTask(2 * 1024); + jConf.setNumMapTasks(2); + jConf.setNumReduceTasks(2); + jConf.setQueueName("default"); + jConf.setUser("u1"); FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf); - // cluster should still be blocked for job1 and so even job2 should not run - // even though it is a normal job - assertNull(scheduler.assignTasks(tracker("tt1"))); - - scheduler.taskTrackerManager.killJob(job2.getJobID()); - scheduler.taskTrackerManager.killJob(job1.getJobID()); + checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1"); + checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1"); - LOG.debug("Submit one high memory(2GB vmem, 2GB pmem) job of " + LOG.debug("Submit one normal memory(1GB maps/reduces) job of " + "1 map, 0 reduce tasks."); - jConf.setMaxVirtualMemoryForTask(2 * 1024 * 1024 * 1024L); - jConf.setMaxPhysicalMemoryForTask(2 * 1024 * 1024 * 1024L); + jConf = new JobConf(conf); + jConf.setMemoryForMapTask(1 * 1024); + jConf.setMemoryForReduceTask(1 * 1024); + jConf.setNumMapTasks(1); + jConf.setNumReduceTasks(1); + jConf.setQueueName("default"); + jConf.setUser("u1"); FakeJobInProgress job3 = submitJobAndInit(JobStatus.PREP, jConf); - // TTs should not run these jobs i.e. cluster blocked because of lack of - // pmem now. - assertNull(scheduler.assignTasks(tracker("tt1"))); - assertNull(scheduler.assignTasks(tracker("tt1"))); - - // Job should still be alive - assertTrue(job3.getStatus().getRunState() == JobStatus.RUNNING); - LOG.debug("Submit a normal job of 1 map, 0 reduce tasks."); - // Use cluster-wide defaults - jConf.setMaxVirtualMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT); - jConf.setMaxPhysicalMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT); - submitJobAndInit(JobStatus.PREP, jConf); // job4 - - // cluster should still be blocked for job3 and so even job4 should not run - // even though it is a normal job + // Job2 cannot fit on tt2 or tt1. Blocking. Job3 also will not run. assertNull(scheduler.assignTasks(tracker("tt1"))); + assertNull(scheduler.assignTasks(tracker("tt2"))); + assertNull(scheduler.assignTasks(tracker("tt1"))); + assertNull(scheduler.assignTasks(tracker("tt2"))); } /** @@ -1900,13 +1676,6 @@ // create a cluster with a single node. LOG.debug("Starting cluster with 1 tasktracker, 2 map and 2 reduce slots"); taskTrackerManager = new FakeTaskTrackerManager(1, 2, 2); - TaskTrackerStatus.ResourceStatus ttStatus = - taskTrackerManager.getTaskTracker("tt1").getResourceStatus(); - LOG.debug("Assume TT has 4 GB virtual mem and 2 GB RAM"); - ttStatus.setTotalVirtualMemory(4 * 1024 * 1024 * 1024L); - ttStatus.setReservedVirtualMemory(0); - ttStatus.setTotalPhysicalMemory(2 * 1024 * 1024 * 1024L); - ttStatus.setReservedPhysicalMemory(0); // create scheduler ArrayList queues = new ArrayList(); @@ -1915,14 +1684,17 @@ resConf.setFakeQueues(queues); scheduler.setTaskTrackerManager(taskTrackerManager); // enabled memory-based scheduling - LOG.debug("By default, jobs get 0.5 GB per task vmem" + - " and 2 GB max vmem, with 50% of it for RAM"); - scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY, - 512 * 1024 * 1024L); - scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, - 2 * 1024 * 1024 * 1024L); - resConf.setDefaultPercentOfPmemInVmem(50.0f); - resConf.setLimitMaxPmemForTasks(1 * 1024 * 1024 * 1024L); + LOG.debug("Assume TT has 2GB for maps and 2GB for reduces"); + scheduler.getConf().setLong( + JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, + 2 * 1024L); + scheduler.getConf().setLong( + JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 512); + scheduler.getConf().setLong( + JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, + 2 * 1024L); + scheduler.getConf().setLong( + JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 512); scheduler.setResourceManagerConf(resConf); scheduler.start(); @@ -1931,6 +1703,8 @@ JobConf jConf = new JobConf(); jConf.setNumMapTasks(2); jConf.setNumReduceTasks(2); + jConf.setMemoryForMapTask(512); + jConf.setMemoryForReduceTask(512); jConf.setQueueName("default"); jConf.setUser("u1"); FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf); @@ -1949,6 +1723,8 @@ jConf = new JobConf(); jConf.setNumMapTasks(1); jConf.setNumReduceTasks(1); + jConf.setMemoryForMapTask(512); + jConf.setMemoryForReduceTask(512); jConf.setQueueName("default"); jConf.setUser("u1"); FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf); @@ -2396,21 +2172,8 @@ */ public void testHighRamJobWithSpeculativeExecution() throws IOException { // 2 map and 2 reduce slots - taskTrackerManager = new FakeTaskTrackerManager(2, 2, 2); - - //task tracker memory configurations. - TaskTrackerStatus.ResourceStatus ttStatus = - taskTrackerManager.getTaskTracker("tt1").getResourceStatus(); - ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L); - ttStatus.setReservedVirtualMemory(0); - ttStatus.setTotalPhysicalMemory(1536 * 1024 * 1024L); - ttStatus.setReservedPhysicalMemory(0); - ttStatus = taskTrackerManager.getTaskTracker("tt2").getResourceStatus(); - ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L); - ttStatus.setReservedVirtualMemory(0); - ttStatus.setTotalPhysicalMemory(1536 * 1024 * 1024L); - ttStatus.setReservedPhysicalMemory(0); - + taskTrackerManager = new FakeTaskTrackerManager(2, 3, 3); + // 1GB for each map, 1GB for each reduce taskTrackerManager.addQueues(new String[] { "default" }); ArrayList queues = new ArrayList(); @@ -2418,19 +2181,23 @@ resConf.setFakeQueues(queues); scheduler.setTaskTrackerManager(taskTrackerManager); // enabled memory-based scheduling - scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY, - 1 * 1024 * 1024 * 1024L); - scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, - 3 * 1024 * 1024 * 1024L); - resConf.setDefaultPercentOfPmemInVmem(33.3f); - resConf.setLimitMaxPmemForTasks(1536 * 1024 * 1024L); + scheduler.getConf().setLong( + JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, + 3 * 1024L); + scheduler.getConf().setLong( + JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024L); + scheduler.getConf().setLong( + JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, + 3 * 1024L); + scheduler.getConf().setLong( + JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024L); scheduler.setResourceManagerConf(resConf); scheduler.start(); - + //Submit a high memory job with speculative tasks. JobConf jConf = new JobConf(); - jConf.setMaxVirtualMemoryForTask(2 * 1024 * 1024 * 1024L); // 2GB vmem - jConf.setMaxPhysicalMemoryForTask(400 * 1024 * 1024L); // 400MB pmem + jConf.setMemoryForMapTask(2 * 1024); + jConf.setMemoryForReduceTask(0); jConf.setNumMapTasks(1); jConf.setNumReduceTasks(0); jConf.setQueueName("default"); @@ -2439,20 +2206,18 @@ jConf.setReduceSpeculativeExecution(false); FakeJobInProgress job1 = new FakeJobInProgress(new JobID("test", ++jobCounter), jConf, taskTrackerManager,"u1"); - - //Submit a high memory job with speculative tasks. taskTrackerManager.submitJob(job1); - + + //Submit normal job jConf = new JobConf(); - jConf.setMaxVirtualMemoryForTask(100 * 1024 * 1024L); // 100MB vmem - jConf.setMaxPhysicalMemoryForTask(50 * 1024 * 1024L); // 50MB pmem + jConf.setMemoryForMapTask(1 * 1024); + jConf.setMemoryForReduceTask(0); jConf.setNumMapTasks(1); jConf.setNumReduceTasks(0); jConf.setQueueName("default"); jConf.setUser("u1"); jConf.setMapSpeculativeExecution(false); jConf.setReduceSpeculativeExecution(false); - //Submit normal job FakeJobInProgress job2 = submitJob(JobStatus.PREP, jConf); controlledInitializationPoller.selectJobsToInitialize(); @@ -2487,8 +2252,8 @@ //Now submit high ram job with speculative reduce and check. jConf = new JobConf(); - jConf.setMaxVirtualMemoryForTask(2 * 1024 * 1024 * 1024L); // 2GB vmem - jConf.setMaxPhysicalMemoryForTask(400 * 1024 * 1024L); // 400MB pmem + jConf.setMemoryForMapTask(2 * 1024); + jConf.setMemoryForReduceTask(2 * 1024L); jConf.setNumMapTasks(1); jConf.setNumReduceTasks(1); jConf.setQueueName("default"); @@ -2497,76 +2262,43 @@ jConf.setReduceSpeculativeExecution(true); FakeJobInProgress job3 = new FakeJobInProgress(new JobID("test", ++jobCounter), jConf, taskTrackerManager,"u1"); - - //Submit a high memory job with speculative reduce tasks. taskTrackerManager.submitJob(job3); - + + //Submit normal job w.r.t reduces jConf = new JobConf(); - jConf.setMaxVirtualMemoryForTask(100 * 1024 * 1024L); // 100MB vmem - jConf.setMaxPhysicalMemoryForTask(50 * 1024 * 1024L); // 50MB pmem + jConf.setMemoryForMapTask(2 * 1024L); + jConf.setMemoryForReduceTask(1 * 104L); jConf.setNumMapTasks(1); jConf.setNumReduceTasks(1); jConf.setQueueName("default"); jConf.setUser("u1"); jConf.setMapSpeculativeExecution(false); jConf.setReduceSpeculativeExecution(false); - //Submit normal job FakeJobInProgress job4 = submitJob(JobStatus.PREP, jConf); controlledInitializationPoller.selectJobsToInitialize(); raiseStatusChangeEvents(scheduler.jobQueuesManager); - //all maps of jobs get assigned to same task tracker as - //job does not have speculative map and same tracker sends two heart - //beat back to back. + + // Finish up the map scheduler checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1"); - //first map slot gets attention on this tracker. checkAssignment("tt2", "attempt_test_0004_m_000001_0 on tt2"); - //now first reduce of the job3 would be scheduled on tt2 since it has - //memory. - //assigntasks() would check for free reduce slot is greater than - //map slots. Seeing there is more free reduce slot it would try scheduling - //reduce of job1 but would block as in it is a high memory task. - assertNull(scheduler.assignTasks(tracker("tt1"))); - //TT2 would get the reduce task from high memory job as the tt is running - //normal jobs map. which is low mem. - checkAssignment("tt2", "attempt_test_0003_r_000001_0 on tt2"); - // now if either TT comes back, it will block because all maps - // are done, and the first jobs reduce has a speculative task. + + // first, a reduce from j3 will run + // at this point, there is a speculative task for the same job to be + //scheduled. This task would be scheduled. Till the tasks from job3 gets + //complete none of the tasks from other jobs would be scheduled. + checkAssignment("tt1", "attempt_test_0003_r_000001_0 on tt1"); + assertEquals("pending reduces greater than zero " , job3.pendingMaps(), 0); + //make same tracker get back, check if you are blocking. Your job + //has speculative reduce task so tracker should be blocked even tho' it + //can run job4's reduce. assertNull(scheduler.assignTasks(tracker("tt1"))); - assertNull(scheduler.assignTasks(tracker("tt2"))); - //finish maps. - taskTrackerManager.finishTask("tt1", "attempt_test_0003_m_000001_0", - job3); - taskTrackerManager.finishTask("tt2", "attempt_test_0004_m_000001_0", - job4); - //check speculative reduce code path is covered. - assertEquals("Pending reduces not zero for high " + - "ram job with speculative reduce.", 0, job3.pendingReduces()); - //if tt2 returns back it is not given any task even if it can schedule - //job2 reduce. - assertNull(scheduler.assignTasks(tracker("tt2"))); - //speculative reduce of the job3 would be scheduled. - checkAssignment("tt1", "attempt_test_0003_r_000001_1 on tt1"); - //now both speculative and actual task have been scheduled for job3. - //Normal task of Job4 would now be scheduled on TT1 as it has free space - //to run. + //TT2 now gets speculative map of the job1 + checkAssignment("tt2", "attempt_test_0003_r_000001_1 on tt2"); + + // Now since j3 has no more speculative reduces, it can schedule + // the j4. checkAssignment("tt1", "attempt_test_0004_r_000001_0 on tt1"); - //No more tasks. - assertNull(scheduler.assignTasks(tracker("tt2"))); - assertNull(scheduler.assignTasks(tracker("tt1"))); - - //finish all the reduces. - taskTrackerManager.finishTask("tt1", "attempt_test_0003_r_000001_1", - job3); - taskTrackerManager.finishTask("tt2", "attempt_test_0003_r_000001_0", - job3); - //finish the job - taskTrackerManager.finalizeJob(job3); - //finish the task and the job. - taskTrackerManager.finishTask("tt1", "attempt_test_0004_r_000001_0", - job4); - taskTrackerManager.finalizeJob(job4); - } private void checkFailedInitializedJobMovement() throws IOException { Modified: hadoop/core/branches/branch-0.20/src/mapred/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/mapred-default.xml?rev=778700&r1=778699&r2=778700&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/src/mapred/mapred-default.xml (original) +++ hadoop/core/branches/branch-0.20/src/mapred/mapred-default.xml Tue May 26 13:35:12 2009 @@ -186,42 +186,6 @@ - mapred.tasktracker.pmem.reserved - -1 - Configuration property to specify the amount of physical memory - that has to be reserved by the TaskTracker for system usage (OS, TT etc). - The reserved physical memory should be a part of the total physical memory - available on the TaskTracker. - - The reserved physical memory and the total physical memory values are - reported by the TaskTracker as part of heart-beat so that they can - considered by a scheduler. Please refer to the documentation of the - configured scheduler to see how this property is used. - - - - - mapred.task.default.maxvmem - -1 - - Cluster-wide configuration in bytes to be set by the administrators that - provides default amount of maximum virtual memory for job's tasks. This has - to be set on both the JobTracker node for the sake of scheduling decisions - and on the TaskTracker nodes for the sake of memory management. - - If a job doesn't specify its virtual memory requirement by setting - mapred.task.maxvmem to -1, tasks are assured a memory limit set - to this property. This property is set to -1 by default. - - This value should in general be less than the cluster-wide - configuration mapred.task.limit.maxvmem. If not or if it is not set, - TaskTracker's memory management will be disabled and a scheduler's memory - based scheduling decisions may be affected. Please refer to the - documentation of the configured scheduler to see how this property is used. - - - - mapred.task.limit.maxvmem -1 @@ -272,23 +236,6 @@ - mapred.task.maxpmemname> - -1 - - The maximum amount of physical memory any task of a job will use in bytes. - - This value may be used by schedulers that support scheduling based on job's - memory requirements. In general, a task of this job will be scheduled on a - TaskTracker, only if the amount of physical memory still unoccupied on the - TaskTracker is greater than or equal to this value. Different schedulers can - take different decisions, some might just ignore this value. Please refer to - the documentation of the scheduler being configured to see if it does - memory based scheduling and if it does, how this variable is used by that - scheduler. - - - - mapred.tasktracker.memory_calculator_plugin Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobConf.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobConf.java?rev=778700&r1=778699&r2=778700&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobConf.java (original) +++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobConf.java Tue May 26 13:35:12 2009 @@ -123,111 +123,11 @@ */ public static final String DEFAULT_QUEUE_NAME = "default"; - /** - * Cluster-wide configuration to be set by the administrators that provides - * default amount of maximum virtual memory for job's tasks. This has to be - * set on both the JobTracker node for the sake of scheduling decisions and on - * the TaskTracker nodes for the sake of memory management. - * - *

- * - * If a job doesn't specify its virtual memory requirement by setting - * {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to {@link #DISABLED_MEMORY_LIMIT}, - * tasks are assured a memory limit set to this property. This property is - * disabled by default, and if not explicitly set to a valid value by the - * administrators and if a job doesn't specify its virtual memory - * requirements, the job's tasks will not be assured anything and may be - * killed by a TT that intends to control the total memory usage of the tasks - * via memory management functionality. - * - *

- * - * This value should in general be less than the cluster-wide configuration - * {@link #UPPER_LIMIT_ON_TASK_VMEM_PROPERTY} . If not or if it not set, - * TaskTracker's memory management may be disabled and a scheduler's memory - * based scheduling decisions will be affected. Please refer to the - * documentation of the configured scheduler to see how this property is used. - */ - public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY = - "mapred.task.default.maxvmem"; + static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY = + "mapred.job.map.memory.mb"; - /** - * The maximum amount of memory any task of this job will use. - * - *

- * - * This value will be used by TaskTrackers for monitoring the memory usage of - * tasks of this jobs. If a TaskTracker's memory management functionality is - * enabled, each task of this job will be allowed to use a maximum virtual - * memory specified by this property. If the task's memory usage goes over - * this value, the task will be failed by the TT. If not set, the cluster-wide - * configuration {@link #MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY} is used as the - * default value for memory requirements. If this property cascaded with - * {@link #MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY} becomes equal to -1, job's - * tasks will not be assured anything and may be killed by a TT that intends - * to control the total memory usage of the tasks via memory management - * functionality. If the memory management functionality is disabled on a TT, - * this value is ignored. - * - *

- * - * This value should also be not more than the cluster-wide configuration - * {@link #UPPER_LIMIT_ON_TASK_VMEM_PROPERTY} which has to be set by the site - * administrators. - * - *

- * - * This value may be used by schedulers that support scheduling based on job's - * memory requirements. In general, a task of this job will be scheduled on a - * TaskTracker only if the amount of virtual memory still unoccupied on the - * TaskTracker is greater than or equal to this value. But different - * schedulers can take different decisions. Please refer to the documentation - * of the scheduler being configured to see if it does memory based scheduling - * and if it does, how this property is used by that scheduler. - * - * @see #setMaxVirtualMemoryForTask(long) - * @see #getMaxVirtualMemoryForTask() - */ - public static final String MAPRED_TASK_MAXVMEM_PROPERTY = - "mapred.task.maxvmem"; - - /** - * The maximum amount of physical memory any task of a job will use. - * - *

- * - * This value may be used by schedulers that support scheduling based on job's - * memory requirements. In general, a task of this job will be scheduled on a - * TaskTracker, only if the amount of physical memory still unoccupied on the - * TaskTracker is greater than or equal to this value. But different - * schedulers can take different decisions. Please refer to the documentation - * of the scheduler being configured to see how it does memory based - * scheduling and how this variable is used by that scheduler. - * - * @see #setMaxPhysicalMemoryForTask(long) - * @see #getMaxPhysicalMemoryForTask() - */ - public static final String MAPRED_TASK_MAXPMEM_PROPERTY = - "mapred.task.maxpmem"; - - /** - * Cluster-wide configuration to be set by the site administrators that - * provides an upper limit on the maximum virtual memory that can be specified - * by a job. The job configuration {@link #MAPRED_TASK_MAXVMEM_PROPERTY} and - * the cluster-wide configuration - * {@link #MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY} should, by definition, be - * less than this value. If the job configuration - * {@link #MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY} is more than this value, - * depending on the scheduler being configured, the job may be rejected or the - * job configuration may just be ignored. - * - *

- * - * If it is not set on a TaskTracker, TaskTracker's memory management will be - * disabled. - */ - public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY = - "mapred.task.limit.maxvmem"; + static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY = + "mapred.job.reduce.memory.mb"; /** * Construct a map/reduce job configuration. @@ -1491,53 +1391,23 @@ public String getJobLocalDir() { return get("job.local.dir"); } - - /** - * The maximum amount of memory any task of this job will use. See - * {@link #MAPRED_TASK_MAXVMEM_PROPERTY} - * - * @return The maximum amount of memory any task of this job will use, in - * bytes. - * @see #setMaxVirtualMemoryForTask(long) - */ - public long getMaxVirtualMemoryForTask() { - return getLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT); + + long getMemoryForMapTask() { + return getLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, + DISABLED_MEMORY_LIMIT); } - /** - * Set the maximum amount of memory any task of this job can use. See - * {@link #MAPRED_TASK_MAXVMEM_PROPERTY} - * - * @param vmem Maximum amount of virtual memory in bytes any task of this job - * can use. - * @see #getMaxVirtualMemoryForTask() - */ - public void setMaxVirtualMemoryForTask(long vmem) { - setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY, vmem); + void setMemoryForMapTask(long mem) { + setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem); } - /** - * The maximum amount of physical memory any task of this job will use. See - * {@link #MAPRED_TASK_MAXPMEM_PROPERTY} - * - * @return The maximum amount of physical memory any task of this job will - * use, in bytes. - * @see #setMaxPhysicalMemoryForTask(long) - */ - public long getMaxPhysicalMemoryForTask() { - return getLong(JobConf.MAPRED_TASK_MAXPMEM_PROPERTY, DISABLED_MEMORY_LIMIT); + long getMemoryForReduceTask() { + return getLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, + DISABLED_MEMORY_LIMIT); } - /** - * Set the maximum amount of physical memory any task of this job can use. See - * {@link #MAPRED_TASK_MAXPMEM_PROPERTY} - * - * @param pmem Maximum amount of physical memory in bytes any task of this job - * can use. - * @see #getMaxPhysicalMemoryForTask() - */ - public void setMaxPhysicalMemoryForTask(long pmem) { - setLong(JobConf.MAPRED_TASK_MAXPMEM_PROPERTY, pmem); + void setMemoryForReduceTask(long mem) { + setLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, mem); } /** @@ -1559,6 +1429,19 @@ set("mapred.job.queue.name", queueName); } + /** + * Normalize the negative values in configuration + * + * @param val + * @return normalized value + */ + public static long normalizeMemoryConfigValue(long val) { + if (val < 0) { + val = DISABLED_MEMORY_LIMIT; + } + return val; + } + /** * Find a jar that contains a class of the same name, if any. * It will return a jar file, even if that is not the first thing Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=778700&r1=778699&r2=778700&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original) +++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Tue May 26 13:35:12 2009 @@ -175,8 +175,6 @@ private boolean hasSpeculativeMaps; private boolean hasSpeculativeReduces; private long inputLength = 0; - private long maxVirtualMemoryForTask; - private long maxPhysicalMemoryForTask; // Per-job counters public static enum Counter { @@ -283,8 +281,6 @@ this.nonRunningReduces = new LinkedList(); this.runningReduces = new LinkedHashSet(); this.resourceEstimator = new ResourceEstimator(this); - setMaxVirtualMemoryForTask(conf.getMaxVirtualMemoryForTask()); - setMaxPhysicalMemoryForTask(conf.getMaxPhysicalMemoryForTask()); } /** @@ -568,23 +564,6 @@ JobHistory.JobInfo.logJobPriority(jobId, priority); } - // Accessors for resources. - long getMaxVirtualMemoryForTask() { - return maxVirtualMemoryForTask; - } - - void setMaxVirtualMemoryForTask(long maxVMem) { - maxVirtualMemoryForTask = maxVMem; - } - - long getMaxPhysicalMemoryForTask() { - return maxPhysicalMemoryForTask; - } - - void setMaxPhysicalMemoryForTask(long maxPMem) { - maxPhysicalMemoryForTask = maxPMem; - } - // Update the job start/launch time (upon restart) and log to history synchronized void updateJobInfo(long startTime, long launchTime) { // log and change to the job's start/launch time Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=778700&r1=778699&r2=778700&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original) +++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java Tue May 26 13:35:12 2009 @@ -1482,6 +1482,11 @@ Path systemDir = null; private JobConf conf; + long limitMaxMemForMapTasks; + long limitMaxMemForReduceTasks; + long memSizeForMapSlotOnJT; + long memSizeForReduceSlotOnJT; + private QueueManager queueManager; /** @@ -1510,6 +1515,8 @@ this.conf = conf; JobConf jobConf = new JobConf(conf); + initializeTaskMemoryRelatedConfig(); + // Read the hosts/exclude files to restrict access to the jobtracker. this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""), conf.get("mapred.hosts.exclude", "")); @@ -2940,6 +2947,15 @@ throw ioe; } + // Check the job if it cannot run in the cluster because of invalid memory + // requirements. + try { + checkMemoryRequirements(job); + } catch (IOException ioe) { + new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId)); + throw ioe; + } + return addJob(jobId, job); } @@ -3199,6 +3215,16 @@ } TaskCompletionEvent[] EMPTY_EVENTS = new TaskCompletionEvent[0]; + + static final String MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY = + "mapred.cluster.map.memory.mb"; + static final String MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY = + "mapred.cluster.reduce.memory.mb"; + + static final String MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY = + "mapred.cluster.max.map.memory.mb"; + static final String MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY = + "mapred.cluster.max.reduce.memory.mb"; /* * Returns a list of TaskCompletionEvent for the given job, @@ -3595,4 +3621,81 @@ } SecurityUtil.getPolicy().refresh(); } + + private void initializeTaskMemoryRelatedConfig() { + memSizeForMapSlotOnJT = + JobConf.normalizeMemoryConfigValue(conf.getLong( + JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, + JobConf.DISABLED_MEMORY_LIMIT)); + memSizeForReduceSlotOnJT = + JobConf.normalizeMemoryConfigValue(conf.getLong( + JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, + JobConf.DISABLED_MEMORY_LIMIT)); + limitMaxMemForMapTasks = + JobConf.normalizeMemoryConfigValue(conf.getLong( + JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, + JobConf.DISABLED_MEMORY_LIMIT)); + limitMaxMemForReduceTasks = + JobConf.normalizeMemoryConfigValue(conf.getLong( + JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, + JobConf.DISABLED_MEMORY_LIMIT)); + LOG.info(new StringBuilder().append("Scheduler configured with ").append( + "(memSizeForMapSlotOnJT, memSizeForReduceSlotOnJT,").append( + " limitMaxMemForMapTasks, limitMaxMemForReduceTasks) (").append( + memSizeForMapSlotOnJT).append(", ").append(memSizeForReduceSlotOnJT) + .append(", ").append(limitMaxMemForMapTasks).append(", ").append( + limitMaxMemForReduceTasks).append(")")); + } + + private boolean perTaskMemoryConfigurationSetOnJT() { + if (limitMaxMemForMapTasks == JobConf.DISABLED_MEMORY_LIMIT + || limitMaxMemForReduceTasks == JobConf.DISABLED_MEMORY_LIMIT + || memSizeForMapSlotOnJT == JobConf.DISABLED_MEMORY_LIMIT + || memSizeForReduceSlotOnJT == JobConf.DISABLED_MEMORY_LIMIT) { + return false; + } + return true; + } + + /** + * Check the job if it has invalid requirements and throw and IOException if does have. + * + * @param job + * @throws IOException + */ + private void checkMemoryRequirements(JobInProgress job) + throws IOException { + if (!perTaskMemoryConfigurationSetOnJT()) { + LOG.debug("Per-Task memory configuration is not set on JT. " + + "Not checking the job for invalid memory requirements."); + return; + } + + boolean invalidJob = false; + String msg = ""; + long maxMemForMapTask = job.getJobConf().getMemoryForMapTask(); + long maxMemForReduceTask = job.getJobConf().getMemoryForReduceTask(); + + if (maxMemForMapTask == JobConf.DISABLED_MEMORY_LIMIT + || maxMemForReduceTask == JobConf.DISABLED_MEMORY_LIMIT) { + invalidJob = true; + msg = "Invalid job requirements."; + } + + if (maxMemForMapTask > limitMaxMemForMapTasks + || maxMemForReduceTask > limitMaxMemForReduceTasks) { + invalidJob = true; + msg = "Exceeds the cluster's max-memory-limit."; + } + + if (invalidJob) { + StringBuilder jobStr = + new StringBuilder().append(job.getJobID().toString()).append("(") + .append(maxMemForMapTask).append(" memForMapTasks ").append( + maxMemForReduceTask).append(" memForReduceTasks): "); + LOG.warn(jobStr.toString() + msg); + + throw new IOException(jobStr.toString() + msg); + } + } } Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java?rev=778700&r1=778699&r2=778700&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java (original) +++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java Tue May 26 13:35:12 2009 @@ -59,8 +59,7 @@ tasksToBeRemoved = new ArrayList(); maxMemoryAllowedForAllTasks = - taskTracker.getTotalVirtualMemoryOnTT() - - taskTracker.getReservedVirtualMemory(); + taskTracker.getTotalMemoryAllottedForTasksOnTT() * 1024 * 1024L; monitoringInterval = taskTracker.getJobConf().getLong( "mapred.tasktracker.taskmemorymanager.monitoring-interval", 5000L); @@ -205,17 +204,6 @@ LOG.info("Memory usage of ProcessTree " + pId + " :" + currentMemUsage + "bytes. Limit : " + limit + "bytes"); - if (limit > taskTracker.getLimitMaxVMemPerTask()) { - // TODO: With monitoring enabled and no scheduling based on - // memory,users can seriously hijack the system by specifying memory - // requirements well above the cluster wide limit. Ideally these - // jobs - // should have been rejected by JT/scheduler. Because we can't do - // that, in the minimum we should fail the tasks and hence the job. - LOG.warn("Task " + tid - + " 's maxVmemPerTask is greater than TT's limitMaxVmPerTask"); - } - if (limit != JobConf.DISABLED_MEMORY_LIMIT && currentMemUsage > limit) { // Task (the root process) is still alive and overflowing memory. @@ -245,12 +233,11 @@ } } - LOG.debug("Memory still in usage across all tasks : " + memoryStillInUsage - + "bytes. Total limit : " + maxMemoryAllowedForAllTasks); - if (memoryStillInUsage > maxMemoryAllowedForAllTasks) { - LOG.warn("The total memory usage is still overflowing TTs limits." - + " Trying to kill a few tasks with the least progress."); + LOG.warn("The total memory in usage " + memoryStillInUsage + + " is still overflowing TTs limits " + + maxMemoryAllowedForAllTasks + + ". Trying to kill a few tasks with the least progress."); killTasksWithLeastProgress(memoryStillInUsage); } Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskRunner.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=778700&r1=778699&r2=778700&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original) +++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Tue May 26 13:35:12 2009 @@ -373,7 +373,7 @@ taskid.toString(), t.isTaskCleanupTask())), this.conf).toString(); t.setPidFile(pidFile); - tracker.addToMemoryManager(t.getTaskID(), conf, pidFile); + tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf, pidFile); // set memory limit using ulimit if feasible and necessary ... String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);