Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java?rev=885145&r1=885144&r2=885145&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java Sat Nov 28 20:26:01 2009 @@ -20,29 +20,21 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; +import org.apache.hadoop.conf.Configuration; class MemoryMatcher { private static final Log LOG = LogFactory.getLog(MemoryMatcher.class); - private CapacityTaskScheduler scheduler; + static long memSizeForMapSlotOnJT = JobConf.DISABLED_MEMORY_LIMIT; + static long memSizeForReduceSlotOnJT = JobConf.DISABLED_MEMORY_LIMIT; + static long limitMaxMemForMapTasks = JobConf.DISABLED_MEMORY_LIMIT; + static long limitMaxMemForReduceTasks = JobConf.DISABLED_MEMORY_LIMIT; - public MemoryMatcher(CapacityTaskScheduler capacityTaskScheduler) { - this.scheduler = capacityTaskScheduler; - } - boolean isSchedulingBasedOnMemEnabled() { - if (scheduler.getLimitMaxMemForMapSlot() - == JobConf.DISABLED_MEMORY_LIMIT - || scheduler.getLimitMaxMemForReduceSlot() - == JobConf.DISABLED_MEMORY_LIMIT - || scheduler.getMemSizeForMapSlot() - == JobConf.DISABLED_MEMORY_LIMIT - || scheduler.getMemSizeForReduceSlot() - == JobConf.DISABLED_MEMORY_LIMIT) { - return false; - } - return true; + public MemoryMatcher() { } /** @@ -68,12 +60,12 @@ // Get the memory "allotted" for this task based on number of slots long myVmem = 0; if (task.getIsMap() && taskType == TaskType.MAP) { - long memSizePerMapSlot = scheduler.getMemSizeForMapSlot(); + long memSizePerMapSlot = getMemSizeForMapSlot(); myVmem = memSizePerMapSlot * task.getNumSlots(); } else if (!task.getIsMap() && taskType == TaskType.REDUCE) { - long memSizePerReduceSlot = scheduler.getMemSizeForReduceSlot(); + long memSizePerReduceSlot = getMemSizeForReduceSlot(); myVmem = memSizePerReduceSlot * task.getNumSlots(); } vmem += myVmem; @@ -108,11 +100,11 @@ if (taskType == TaskType.MAP) { memForThisTask = job.getJobConf().getMemoryForMapTask(); totalMemUsableOnTT = - scheduler.getMemSizeForMapSlot() * taskTracker.getMaxMapSlots(); + getMemSizeForMapSlot() * taskTracker.getMaxMapSlots(); } else if (taskType == TaskType.REDUCE) { memForThisTask = job.getJobConf().getMemoryForReduceTask(); totalMemUsableOnTT = - scheduler.getMemSizeForReduceSlot() + getMemSizeForReduceSlot() * taskTracker.getMaxReduceSlots(); } @@ -135,4 +127,105 @@ } return true; } + + static boolean isSchedulingBasedOnMemEnabled() { + if (getLimitMaxMemForMapSlot() + == JobConf.DISABLED_MEMORY_LIMIT + || getLimitMaxMemForReduceSlot() + == JobConf.DISABLED_MEMORY_LIMIT + || getMemSizeForMapSlot() + == JobConf.DISABLED_MEMORY_LIMIT + || getMemSizeForReduceSlot() + == JobConf.DISABLED_MEMORY_LIMIT) { + return false; + } + return true; + } + + public static void initializeMemoryRelatedConf(Configuration conf) { + //handling @deprecated + if (conf.get( + CapacitySchedulerConf.DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY) != + null) { + LOG.warn( + JobConf.deprecatedString( + CapacitySchedulerConf.DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY)); + } + + //handling @deprecated + if (conf.get(CapacitySchedulerConf.UPPER_LIMIT_ON_TASK_PMEM_PROPERTY) != + null) { + LOG.warn( + JobConf.deprecatedString( + CapacitySchedulerConf.UPPER_LIMIT_ON_TASK_PMEM_PROPERTY)); + } + + if (conf.get(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY) != null) { + LOG.warn( + JobConf.deprecatedString( + JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY)); + } + + memSizeForMapSlotOnJT = + JobConf.normalizeMemoryConfigValue(conf.getLong( + MRConfig.MAPMEMORY_MB, JobConf.DISABLED_MEMORY_LIMIT)); + memSizeForReduceSlotOnJT = + JobConf.normalizeMemoryConfigValue(conf.getLong( + MRConfig.REDUCEMEMORY_MB, + JobConf.DISABLED_MEMORY_LIMIT)); + + //handling @deprecated values + if (conf.get(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY) != null) { + LOG.warn( + JobConf.deprecatedString( + JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY)+ + " instead use " + JTConfig.JT_MAX_MAPMEMORY_MB + + " and " + JTConfig.JT_MAX_REDUCEMEMORY_MB + ); + + limitMaxMemForMapTasks = limitMaxMemForReduceTasks = + JobConf.normalizeMemoryConfigValue( + conf.getLong( + JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, + JobConf.DISABLED_MEMORY_LIMIT)); + if (limitMaxMemForMapTasks != JobConf.DISABLED_MEMORY_LIMIT && + limitMaxMemForMapTasks >= 0) { + limitMaxMemForMapTasks = limitMaxMemForReduceTasks = + limitMaxMemForMapTasks / + (1024 * 1024); //Converting old values in bytes to MB + } + } else { + limitMaxMemForMapTasks = + JobConf.normalizeMemoryConfigValue( + conf.getLong( + JTConfig.JT_MAX_MAPMEMORY_MB, JobConf.DISABLED_MEMORY_LIMIT)); + limitMaxMemForReduceTasks = + JobConf.normalizeMemoryConfigValue( + conf.getLong( + JTConfig.JT_MAX_REDUCEMEMORY_MB, JobConf.DISABLED_MEMORY_LIMIT)); + } + LOG.info(String.format("Scheduler configured with " + + "(memSizeForMapSlotOnJT, memSizeForReduceSlotOnJT, " + + "limitMaxMemForMapTasks, limitMaxMemForReduceTasks)" + + " (%d,%d,%d,%d)", Long.valueOf(memSizeForMapSlotOnJT), Long + .valueOf(memSizeForReduceSlotOnJT), Long + .valueOf(limitMaxMemForMapTasks), Long + .valueOf(limitMaxMemForReduceTasks))); + } + + static long getMemSizeForMapSlot() { + return memSizeForMapSlotOnJT; + } + + static long getMemSizeForReduceSlot() { + return memSizeForReduceSlotOnJT; + } + + static long getLimitMaxMemForMapSlot() { + return limitMaxMemForMapTasks; + } + + static long getLimitMaxMemForReduceSlot() { + return limitMaxMemForReduceTasks; + } } Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ClusterWithCapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ClusterWithCapacityScheduler.java?rev=885145&r1=885144&r2=885145&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ClusterWithCapacityScheduler.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ClusterWithCapacityScheduler.java Sat Nov 28 20:26:01 2009 @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; /** * A test-cluster based on {@link MiniMRCluster} that is started with @@ -102,7 +103,7 @@ setUpSchedulerConfigFile(schedulerProperties); } - clusterConf.set("mapred.jobtracker.taskScheduler", + clusterConf.set(JTConfig.JT_TASK_SCHEDULER, CapacityTaskScheduler.class.getName()); mrCluster = new MiniMRCluster(numTaskTrackers, "file:///", 1, null, null,