Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 11257 invoked from network); 26 May 2009 13:30:57 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 26 May 2009 13:30:57 -0000 Received: (qmail 88202 invoked by uid 500); 26 May 2009 13:31:09 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 88162 invoked by uid 500); 26 May 2009 13:31:09 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 88133 invoked by uid 99); 26 May 2009 13:31:09 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 May 2009 13:31:09 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 May 2009 13:30:59 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 300FD23888C2; Tue, 26 May 2009 13:30:39 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r778696 [2/2] - in /hadoop/core/trunk: ./ conf/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/mapred/ src/mapred/org/apache/hadoop/mapred/ src/test/ma... Date: Tue, 26 May 2009 13:30:38 -0000 To: core-commits@hadoop.apache.org From: yhemanth@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090526133039.300FD23888C2@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=778696&r1=778695&r2=778696&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Tue May 26 13:30:37 2009 @@ -200,58 +200,10 @@ private TaskMemoryManagerThread taskMemoryManager; private boolean taskMemoryManagerEnabled = true; private long totalVirtualMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT; - private long totalPmemOnTT = JobConf.DISABLED_MEMORY_LIMIT; - private long reservedVirtualMemory = JobConf.DISABLED_MEMORY_LIMIT; - private long reservedPmem = JobConf.DISABLED_MEMORY_LIMIT; - - // Cluster wide default value for max-vm per task - private long defaultMaxVmPerTask = JobConf.DISABLED_MEMORY_LIMIT; - // Cluster wide upper limit on max-vm per task - private long limitMaxVmPerTask = JobConf.DISABLED_MEMORY_LIMIT; - - /** - * Configuration property to specify the amount of virtual memory that has to - * be reserved by the TaskTracker for system usage (OS, TT etc). The reserved - * virtual memory should be a part of the total virtual memory available on - * the TaskTracker. TaskTracker obtains the total virtual memory available on - * the system by using a {@link MemoryCalculatorPlugin}. The total physical - * memory is set to {@link JobConf#DISABLED_MEMORY_LIMIT} on systems lacking a - * MemoryCalculatorPlugin implementation. - * - *

- * - * The reserved virtual memory and the total virtual memory values are - * reported by the TaskTracker as part of heart-beat so that they can - * considered by a scheduler. - * - *

- * - * These two values are also used by the TaskTracker for tracking tasks' - * memory usage. Memory management functionality on a TaskTracker is disabled - * if this property is not set, if it more than the total virtual memory - * reported by MemoryCalculatorPlugin, or if either of the values is negative. - */ - static final String MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY = - "mapred.tasktracker.vmem.reserved"; - - /** - * Configuration property to specify the amount of physical memory that has to - * be reserved by the TaskTracker for system usage (OS, TT etc). The reserved - * physical memory should be a part of the total physical memory available on - * the TaskTracker. TaskTracker obtains the total physical memory available on - * the system by using a {@link MemoryCalculatorPlugin}. The total physical - * memory is set to {@link JobConf#DISABLED_MEMORY_LIMIT} on systems lacking a - * MemoryCalculatorPlugin implementation. - * - *

- * - * The reserved virtual memory and the total virtual memory values are - * reported by the TaskTracker as part of heart-beat so that they can - * considered by a scheduler. - * - */ - static final String MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY = - "mapred.tasktracker.pmem.reserved"; + private long totalPhysicalMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT; + private long mapSlotMemorySizeOnTT = JobConf.DISABLED_MEMORY_LIMIT; + private long reduceSlotSizeMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT; + private long totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT; static final String MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY = "mapred.tasktracker.memory_calculator_plugin"; @@ -1247,14 +1199,14 @@ long freeDiskSpace = getFreeSpace(); long totVmem = getTotalVirtualMemoryOnTT(); long totPmem = getTotalPhysicalMemoryOnTT(); - long rsrvdVmem = getReservedVirtualMemory(); - long rsrvdPmem = getReservedPhysicalMemory(); status.getResourceStatus().setAvailableSpace(freeDiskSpace); status.getResourceStatus().setTotalVirtualMemory(totVmem); status.getResourceStatus().setTotalPhysicalMemory(totPmem); - status.getResourceStatus().setReservedVirtualMemory(rsrvdVmem); - status.getResourceStatus().setReservedPhysicalMemory(rsrvdPmem); + status.getResourceStatus().setMapSlotMemorySizeOnTT( + mapSlotMemorySizeOnTT); + status.getResourceStatus().setReduceSlotMemorySizeOnTT( + reduceSlotSizeMemoryOnTT); } // @@ -1317,53 +1269,11 @@ * @return total size of physical memory. */ long getTotalPhysicalMemoryOnTT() { - return totalPmemOnTT; - } - - /** - * Return the amount of virtual memory reserved on the TaskTracker for system - * usage (OS, TT etc). - */ - long getReservedVirtualMemory() { - return reservedVirtualMemory; - } - - /** - * Return the amount of physical memory reserved on the TaskTracker for system - * usage (OS, TT etc). - */ - long getReservedPhysicalMemory() { - return reservedPmem; + return totalPhysicalMemoryOnTT; } - /** - * Return the limit on the maxVMemPerTask on this TaskTracker - * @return limitMaxVmPerTask - */ - long getLimitMaxVMemPerTask() { - return limitMaxVmPerTask; - } - - /** - * Obtain the virtual memory allocated for a TIP. - * - * If the TIP's job has a configured value for the max-virtual memory, that - * will be returned. Else, the cluster-wide default maxvirtual memory for - * tasks is returned. - * - * @param conf - * @return the virtual memory allocated for the TIP. - */ - long getVirtualMemoryForTask(JobConf conf) { - long vMemForTask = - normalizeMemoryConfigValue(conf.getMaxVirtualMemoryForTask()); - if (vMemForTask == JobConf.DISABLED_MEMORY_LIMIT) { - vMemForTask = - normalizeMemoryConfigValue(fConf.getLong( - JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY, - JobConf.DISABLED_MEMORY_LIMIT)); - } - return vMemForTask; + long getTotalMemoryAllottedForTasksOnTT() { + return totalMemoryAllottedForTasks; } /** @@ -1637,7 +1547,6 @@ private TaskLauncher mapLauncher; private TaskLauncher reduceLauncher; - public JvmManager getJvmManagerInstance() { return jvmManager; } @@ -1775,10 +1684,12 @@ } } - void addToMemoryManager(TaskAttemptID attemptId, + void addToMemoryManager(TaskAttemptID attemptId, boolean isMap, JobConf conf) { if (isTaskMemoryManagerEnabled()) { - taskMemoryManager.addTask(attemptId, getVirtualMemoryForTask(conf)); + taskMemoryManager.addTask(attemptId, isMap ? conf + .getMemoryForMapTask() * 1024 * 1024L : conf + .getMemoryForReduceTask() * 1024 * 1024L); } } @@ -3119,33 +3030,35 @@ + "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT); totalVirtualMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT; } - totalPmemOnTT = memoryCalculatorPlugin.getPhysicalMemorySize(); - if (totalPmemOnTT <= 0) { + totalPhysicalMemoryOnTT = memoryCalculatorPlugin.getPhysicalMemorySize(); + if (totalPhysicalMemoryOnTT <= 0) { LOG.warn("TaskTracker's totalPmem could not be calculated. " + "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT); - totalPmemOnTT = JobConf.DISABLED_MEMORY_LIMIT; + totalPhysicalMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT; } } - reservedVirtualMemory = - normalizeMemoryConfigValue(fConf.getLong( - TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, - JobConf.DISABLED_MEMORY_LIMIT)); - - reservedPmem = - normalizeMemoryConfigValue(fConf.getLong( - TaskTracker.MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY, - JobConf.DISABLED_MEMORY_LIMIT)); - - defaultMaxVmPerTask = - normalizeMemoryConfigValue(fConf.getLong( - JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY, - JobConf.DISABLED_MEMORY_LIMIT)); - - limitMaxVmPerTask = - normalizeMemoryConfigValue(fConf.getLong( - JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, - JobConf.DISABLED_MEMORY_LIMIT)); + mapSlotMemorySizeOnTT = + fConf.getLong( + JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, + JobConf.DISABLED_MEMORY_LIMIT); + reduceSlotSizeMemoryOnTT = + fConf.getLong( + JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, + JobConf.DISABLED_MEMORY_LIMIT); + totalMemoryAllottedForTasks = + maxCurrentMapTasks * mapSlotMemorySizeOnTT + maxCurrentReduceTasks + * reduceSlotSizeMemoryOnTT; + if (totalMemoryAllottedForTasks < 0) { + totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT; + } + if (totalMemoryAllottedForTasks > totalPhysicalMemoryOnTT) { + LOG.info("totalMemoryAllottedForTasks > totalPhysicalMemoryOnTT." + + " Thrashing might happen."); + } else if (totalMemoryAllottedForTasks > totalVirtualMemoryOnTT) { + LOG.info("totalMemoryAllottedForTasks > totalVirtualMemoryOnTT." + + " Thrashing might happen."); + } // start the taskMemoryManager thread only if enabled setTaskMemoryManagerEnabledFlag(); @@ -3164,55 +3077,12 @@ return; } - // /// Missing configuration - StringBuilder mesg = new StringBuilder(); - - long totalVmemOnTT = getTotalVirtualMemoryOnTT(); - if (totalVmemOnTT == JobConf.DISABLED_MEMORY_LIMIT) { - mesg.append("TaskTracker's totalVmem could not be calculated.\n"); + if (totalMemoryAllottedForTasks == JobConf.DISABLED_MEMORY_LIMIT) { taskMemoryManagerEnabled = false; - } - - long reservedVmem = getReservedVirtualMemory(); - if (reservedVmem == JobConf.DISABLED_MEMORY_LIMIT) { - mesg.append("TaskTracker's reservedVmem is not configured.\n"); - taskMemoryManagerEnabled = false; - } - - if (defaultMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT) { - mesg.append("TaskTracker's defaultMaxVmPerTask is not configured.\n"); - taskMemoryManagerEnabled = false; - } - - if (limitMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT) { - mesg.append("TaskTracker's limitMaxVmPerTask is not configured.\n"); - taskMemoryManagerEnabled = false; - } - - if (!taskMemoryManagerEnabled) { - LOG.warn(mesg.toString() + "TaskMemoryManager is disabled."); - return; - } - // ///// End of missing configuration - - // ///// Mis-configuration - if (defaultMaxVmPerTask > limitMaxVmPerTask) { - mesg.append("defaultMaxVmPerTask is mis-configured. " - + "It shouldn't be greater than limitMaxVmPerTask. "); - taskMemoryManagerEnabled = false; - } - - if (reservedVmem > totalVmemOnTT) { - mesg.append("reservedVmemOnTT is mis-configured. " - + "It shouldn't be greater than totalVmemOnTT"); - taskMemoryManagerEnabled = false; - } - - if (!taskMemoryManagerEnabled) { - LOG.warn(mesg.toString() + "TaskMemoryManager is disabled."); + LOG.warn("TaskTracker's totalMemoryAllottedForTasks is -1." + + " TaskMemoryManager is disabled."); return; } - // ///// End of mis-configuration taskMemoryManagerEnabled = true; } Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=778696&r1=778695&r2=778696&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Tue May 26 13:30:37 2009 @@ -55,16 +55,16 @@ static class ResourceStatus implements Writable { private long totalVirtualMemory; - private long reservedVirtualMemory; private long totalPhysicalMemory; - private long reservedPhysicalMemory; + private long mapSlotMemorySizeOnTT; + private long reduceSlotMemorySizeOnTT; private long availableSpace; ResourceStatus() { totalVirtualMemory = JobConf.DISABLED_MEMORY_LIMIT; - reservedVirtualMemory = JobConf.DISABLED_MEMORY_LIMIT; totalPhysicalMemory = JobConf.DISABLED_MEMORY_LIMIT; - reservedPhysicalMemory = JobConf.DISABLED_MEMORY_LIMIT; + mapSlotMemorySizeOnTT = JobConf.DISABLED_MEMORY_LIMIT; + reduceSlotMemorySizeOnTT = JobConf.DISABLED_MEMORY_LIMIT; availableSpace = Long.MAX_VALUE; } @@ -90,24 +90,6 @@ } /** - * Set the amount of virtual memory reserved on the TaskTracker for system - * usage (OS, TT etc). - * - * @param reservedVmem amount of virtual memory reserved in bytes. - */ - void setReservedVirtualMemory(long reservedVmem) { - reservedVirtualMemory = reservedVmem; - } - - /** - * Get the amount of virtual memory reserved on the TaskTracker for system - * usage (OS, TT etc). - */ - long getReservedTotalMemory() { - return reservedVirtualMemory; - } - - /** * Set the maximum amount of physical memory on the tasktracker. * * @param totalRAM maximum amount of physical memory on the tasktracker in @@ -130,23 +112,49 @@ } /** - * Set the amount of physical memory reserved on the TaskTracker for system - * usage (OS, TT etc). + * Set the memory size of each map slot on this TT. This will be used by JT + * for accounting more slots for jobs that use more memory. + * + * @param mem + */ + void setMapSlotMemorySizeOnTT(long mem) { + mapSlotMemorySizeOnTT = mem; + } + + /** + * Get the memory size of each map slot on this TT. See + * {@link #setMapSlotMemorySizeOnTT(long)} * - * @param reservedPmem amount of physical memory reserved in bytes. + * @return */ - void setReservedPhysicalMemory(long reservedPmem) { - reservedPhysicalMemory = reservedPmem; + long getMapSlotMemorySizeOnTT() { + return mapSlotMemorySizeOnTT; } /** - * Get the amount of physical memory reserved on the TaskTracker for system - * usage (OS, TT etc). + * Set the memory size of each reduce slot on this TT. This will be used by + * JT for accounting more slots for jobs that use more memory. + * + * @param mem */ - long getReservedPhysicalMemory() { - return reservedPhysicalMemory; + void setReduceSlotMemorySizeOnTT(long mem) { + reduceSlotMemorySizeOnTT = mem; } + /** + * Get the memory size of each reduce slot on this TT. See + * {@link #setReduceSlotMemorySizeOnTT(long)} + * + * @return + */ + long getReduceSlotMemorySizeOnTT() { + return reduceSlotMemorySizeOnTT; + } + + /** + * Set the available disk space on the TT + * @param availSpace + */ void setAvailableSpace(long availSpace) { availableSpace = availSpace; } @@ -161,17 +169,17 @@ public void write(DataOutput out) throws IOException { WritableUtils.writeVLong(out, totalVirtualMemory); - WritableUtils.writeVLong(out, reservedVirtualMemory); WritableUtils.writeVLong(out, totalPhysicalMemory); - WritableUtils.writeVLong(out, reservedPhysicalMemory); + WritableUtils.writeVLong(out, mapSlotMemorySizeOnTT); + WritableUtils.writeVLong(out, reduceSlotMemorySizeOnTT); WritableUtils.writeVLong(out, availableSpace); } public void readFields(DataInput in) throws IOException { totalVirtualMemory = WritableUtils.readVLong(in); - reservedVirtualMemory = WritableUtils.readVLong(in); totalPhysicalMemory = WritableUtils.readVLong(in); - reservedPhysicalMemory = WritableUtils.readVLong(in); + mapSlotMemorySizeOnTT = WritableUtils.readVLong(in); + reduceSlotMemorySizeOnTT = WritableUtils.readVLong(in); availableSpace = WritableUtils.readVLong(in); } } Added: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java?rev=778696&view=auto ============================================================================== --- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java (added) +++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java Tue May 26 13:30:37 2009 @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.hadoop.examples.SleepJob; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.util.ToolRunner; + +import junit.framework.TestCase; + +public class TestSubmitJob extends TestCase { + private MiniMRCluster miniMRCluster; + + @Override + protected void tearDown() + throws Exception { + if (miniMRCluster != null) { + miniMRCluster.shutdown(); + } + } + + /** + * Test to verify that jobs with invalid memory requirements are killed at the + * JT. + * + * @throws Exception + */ + public void testJobWithInvalidMemoryReqs() + throws Exception { + JobConf jtConf = new JobConf(); + jtConf + .setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024L); + jtConf.setLong(JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, + 2 * 1024L); + jtConf.setLong(JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, + 3 * 1024L); + jtConf.setLong(JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, + 4 * 1024L); + + miniMRCluster = new MiniMRCluster(0, "file:///", 0, null, null, jtConf); + + JobConf clusterConf = miniMRCluster.createJobConf(); + + // No map-memory configuration + JobConf jobConf = new JobConf(clusterConf); + jobConf.setMemoryForReduceTask(1 * 1024L); + runJobAndVerifyFailure(jobConf, JobConf.DISABLED_MEMORY_LIMIT, 1 * 1024L, + "Invalid job requirements."); + + // No reduce-memory configuration + jobConf = new JobConf(clusterConf); + jobConf.setMemoryForMapTask(1 * 1024L); + runJobAndVerifyFailure(jobConf, 1 * 1024L, JobConf.DISABLED_MEMORY_LIMIT, + "Invalid job requirements."); + + // Invalid map-memory configuration + jobConf = new JobConf(clusterConf); + jobConf.setMemoryForMapTask(4 * 1024L); + jobConf.setMemoryForReduceTask(1 * 1024L); + runJobAndVerifyFailure(jobConf, 4 * 1024L, 1 * 1024L, + "Exceeds the cluster's max-memory-limit."); + + // No reduce-memory configuration + jobConf = new JobConf(clusterConf); + jobConf.setMemoryForMapTask(1 * 1024L); + jobConf.setMemoryForReduceTask(5 * 1024L); + runJobAndVerifyFailure(jobConf, 1 * 1024L, 5 * 1024L, + "Exceeds the cluster's max-memory-limit."); + } + + private void runJobAndVerifyFailure(JobConf jobConf, long memForMapTasks, + long memForReduceTasks, String expectedMsg) + throws Exception, + IOException { + String[] args = { "-m", "0", "-r", "0", "-mt", "0", "-rt", "0" }; + boolean throwsException = false; + String msg = null; + try { + ToolRunner.run(jobConf, new SleepJob(), args); + } catch (RemoteException re) { + throwsException = true; + msg = re.unwrapRemoteException().getMessage(); + } + assertTrue(throwsException); + assertNotNull(msg); + + String overallExpectedMsg = + "(" + memForMapTasks + " memForMapTasks " + memForReduceTasks + + " memForReduceTasks): " + expectedMsg; + assertTrue("Observed message - " + msg + + " - doesn't contain expected message - " + overallExpectedMsg, msg + .contains(overallExpectedMsg)); + } +} Modified: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java?rev=778696&r1=778695&r2=778696&view=diff ============================================================================== --- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java (original) +++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java Tue May 26 13:30:37 2009 @@ -22,10 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.examples.SleepJob; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.util.LinuxMemoryCalculatorPlugin; import org.apache.hadoop.util.MemoryCalculatorPlugin; import org.apache.hadoop.util.ToolRunner; @@ -46,7 +43,6 @@ static final Log LOG = LogFactory.getLog(TestTTMemoryReporting.class); - private MiniDFSCluster miniDFSCluster; private MiniMRCluster miniMRCluster; /** @@ -77,41 +73,42 @@ getConf().getLong("totalVmemOnTT", JobConf.DISABLED_MEMORY_LIMIT); long totalPhysicalMemoryOnTT = getConf().getLong("totalPmemOnTT", JobConf.DISABLED_MEMORY_LIMIT); - long virtualMemoryReservedOnTT = - getConf().getLong("reservedVmemOnTT", JobConf.DISABLED_MEMORY_LIMIT); - long physicalMemoryReservedOnTT = - getConf().getLong("reservedPmemOnTT", JobConf.DISABLED_MEMORY_LIMIT); + long mapSlotMemorySize = + getConf().getLong("mapSlotMemorySize", JobConf.DISABLED_MEMORY_LIMIT); + long reduceSlotMemorySize = + getConf() + .getLong("reduceSlotMemorySize", JobConf.DISABLED_MEMORY_LIMIT); long reportedTotalVirtualMemoryOnTT = status.getResourceStatus().getTotalVirtualMemory(); long reportedTotalPhysicalMemoryOnTT = status.getResourceStatus().getTotalPhysicalMemory(); - long reportedVirtualMemoryReservedOnTT = - status.getResourceStatus().getReservedTotalMemory(); - long reportedPhysicalMemoryReservedOnTT = - status.getResourceStatus().getReservedPhysicalMemory(); + long reportedMapSlotMemorySize = + status.getResourceStatus().getMapSlotMemorySizeOnTT(); + long reportedReduceSlotMemorySize = + status.getResourceStatus().getReduceSlotMemorySizeOnTT(); message = "expected memory values : (totalVirtualMemoryOnTT, totalPhysicalMemoryOnTT, " - + "virtualMemoryReservedOnTT, physicalMemoryReservedOnTT) = (" - + totalVirtualMemoryOnTT + ", " + totalPhysicalMemoryOnTT + ", " - + virtualMemoryReservedOnTT + ", " + physicalMemoryReservedOnTT - + ")"; + + "mapSlotMemSize, reduceSlotMemorySize) = (" + + totalVirtualMemoryOnTT + ", " + totalPhysicalMemoryOnTT + "," + + mapSlotMemorySize + "," + reduceSlotMemorySize + ")"; message += "\nreported memory values : (totalVirtualMemoryOnTT, totalPhysicalMemoryOnTT, " - + "virtualMemoryReservedOnTT, physicalMemoryReservedOnTT) = (" + + "reportedMapSlotMemorySize, reportedReduceSlotMemorySize) = (" + reportedTotalVirtualMemoryOnTT + ", " + reportedTotalPhysicalMemoryOnTT - + ", " - + reportedVirtualMemoryReservedOnTT - + ", " - + reportedPhysicalMemoryReservedOnTT + ")"; + + "," + + reportedMapSlotMemorySize + + "," + + reportedReduceSlotMemorySize + + ")"; LOG.info(message); if (totalVirtualMemoryOnTT != reportedTotalVirtualMemoryOnTT || totalPhysicalMemoryOnTT != reportedTotalPhysicalMemoryOnTT - || virtualMemoryReservedOnTT != reportedVirtualMemoryReservedOnTT - || physicalMemoryReservedOnTT != reportedPhysicalMemoryReservedOnTT) { + || mapSlotMemorySize != reportedMapSlotMemorySize + || reduceSlotMemorySize != reportedReduceSlotMemorySize) { hasPassed = false; } return super.assignTasks(status); @@ -132,7 +129,7 @@ TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY, DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class); setUpCluster(conf); - runSleepJob(); + runSleepJob(miniMRCluster.createJobConf()); verifyTestResults(); } finally { tearDownCluster(); @@ -149,8 +146,9 @@ JobConf conf = new JobConf(); conf.setLong("totalVmemOnTT", 4 * 1024 * 1024 * 1024L); conf.setLong("totalPmemOnTT", 2 * 1024 * 1024 * 1024L); - conf.setLong("reservedVmemOnTT", 1 * 1024 * 1024 * 1024L); - conf.setLong("reservedPmemOnTT", 512 * 1024 * 1024L); + conf.setLong("mapSlotMemorySize", 1 * 512L); + conf.setLong("reduceSlotMemorySize", 1 * 1024L); + conf.setClass( TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY, DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class); @@ -158,15 +156,17 @@ 4 * 1024 * 1024 * 1024L); conf.setLong(DummyMemoryCalculatorPlugin.MAXPMEM_TESTING_PROPERTY, 2 * 1024 * 1024 * 1024L); + conf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, + 512L); conf.setLong( - TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, - 1 * 1024 * 1024 * 1024L); - conf.setLong( - TaskTracker.MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY, - 512 * 1024 * 1024L); + JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1024L); + try { setUpCluster(conf); - runSleepJob(); + JobConf jobConf = miniMRCluster.createJobConf(); + jobConf.setMemoryForMapTask(1 * 1024L); + jobConf.setMemoryForReduceTask(2 * 1024L); + runSleepJob(jobConf); verifyTestResults(); } finally { tearDownCluster(); @@ -189,17 +189,10 @@ LinuxMemoryCalculatorPlugin plugin = new LinuxMemoryCalculatorPlugin(); conf.setLong("totalVmemOnTT", plugin.getVirtualMemorySize()); conf.setLong("totalPmemOnTT", plugin.getPhysicalMemorySize()); - conf.setLong("reservedVmemOnTT", 1 * 1024 * 1024 * 1024L); - conf.setLong("reservedPmemOnTT", 512 * 1024 * 1024L); - conf.setLong( - TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, - 1 * 1024 * 1024 * 1024L); - conf.setLong( - TaskTracker.MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY, - 512 * 1024 * 1024L); + try { setUpCluster(conf); - runSleepJob(); + runSleepJob(miniMRCluster.createJobConf()); verifyTestResults(); } finally { tearDownCluster(); @@ -208,22 +201,15 @@ private void setUpCluster(JobConf conf) throws Exception { - conf.setClass("mapred.jobtracker.taskScheduler", - TestTTMemoryReporting.FakeTaskScheduler.class, - TaskScheduler.class); - miniDFSCluster = new MiniDFSCluster(conf, 1, true, null); - FileSystem fileSys = miniDFSCluster.getFileSystem(); - String namenode = fileSys.getUri().toString(); - miniMRCluster = new MiniMRCluster(1, namenode, 3, - null, null, conf); + conf.setClass("mapred.jobtracker.taskScheduler", + TestTTMemoryReporting.FakeTaskScheduler.class, TaskScheduler.class); + conf.set("mapred.job.tracker.handler.count", "1"); + miniMRCluster = new MiniMRCluster(1, "file:///", 3, null, null, conf); } - private void runSleepJob() throws Exception { - Configuration conf = new Configuration(); - conf.set("mapred.job.tracker", "localhost:" - + miniMRCluster.getJobTrackerPort()); + private void runSleepJob(JobConf conf) throws Exception { String[] args = { "-m", "1", "-r", "1", - "-mt", "1000", "-rt", "1000" }; + "-mt", "10", "-rt", "10" }; ToolRunner.run(conf, new SleepJob(), args); } @@ -235,7 +221,8 @@ } private void tearDownCluster() { - if (miniMRCluster != null) { miniMRCluster.shutdown(); } - if (miniDFSCluster != null) { miniDFSCluster.shutdown(); } + if (miniMRCluster != null) { + miniMRCluster.shutdown(); + } } -} \ No newline at end of file +} Modified: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java?rev=778696&r1=778695&r2=778696&view=diff ============================================================================== --- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java (original) +++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java Tue May 26 13:30:37 2009 @@ -19,20 +19,19 @@ package org.apache.hadoop.mapred; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.regex.Pattern; import java.util.regex.Matcher; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.examples.SleepJob; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.util.MemoryCalculatorPlugin; import org.apache.hadoop.util.ProcfsBasedProcessTree; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ToolRunner; -import org.apache.hadoop.fs.FileSystem; import junit.framework.TestCase; @@ -43,18 +42,19 @@ private static final Log LOG = LogFactory.getLog(TestTaskTrackerMemoryManager.class); - private MiniDFSCluster miniDFSCluster; private MiniMRCluster miniMRCluster; private String taskOverLimitPatternString = "TaskTree \\[pid=[0-9]*,tipID=.*\\] is running beyond memory-limits. " + "Current usage : [0-9]*bytes. Limit : %sbytes. Killing task."; - private void startCluster(JobConf conf) throws Exception { - miniDFSCluster = new MiniDFSCluster(conf, 1, true, null); - FileSystem fileSys = miniDFSCluster.getFileSystem(); - String namenode = fileSys.getUri().toString(); - miniMRCluster = new MiniMRCluster(1, namenode, 1, null, null, conf); + private void startCluster(JobConf conf) + throws Exception { + conf.set("mapred.job.tracker.handler.count", "1"); + conf.set("mapred.tasktracker.map.tasks.maximum", "1"); + conf.set("mapred.tasktracker.reduce.tasks.maximum", "1"); + conf.set("mapred.tasktracker.tasks.sleeptime-before-sigkill", "0"); + miniMRCluster = new MiniMRCluster(1, "file:///", 1, null, null, conf); } @Override @@ -62,9 +62,6 @@ if (miniMRCluster != null) { miniMRCluster.shutdown(); } - if (miniDFSCluster != null) { - miniDFSCluster.shutdown(); - } } private int runSleepJob(JobConf conf) throws Exception { @@ -74,15 +71,6 @@ private void runAndCheckSuccessfulJob(JobConf conf) throws IOException { - // Set up job. - JobTracker jt = miniMRCluster.getJobTrackerRunner().getJobTracker(); - conf.set("mapred.job.tracker", jt.getJobTrackerMachine() + ":" - + jt.getTrackerPort()); - NameNode nn = miniDFSCluster.getNameNode(); - conf.set("fs.default.name", "hdfs://" - + nn.getNameNodeAddress().getHostName() + ":" - + nn.getNameNodeAddress().getPort()); - Pattern taskOverLimitPattern = Pattern.compile(String.format(taskOverLimitPatternString, "[0-9]*")); Matcher mat = null; @@ -148,43 +136,12 @@ return; } - JobConf conf = new JobConf(); // Task-memory management disabled by default. - startCluster(conf); - long PER_TASK_LIMIT = 100L; // Doesn't matter how low. - conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT); - runAndCheckSuccessfulJob(conf); - } - - /** - * Test for verifying that tasks with no limits, with the cumulative usage - * still under TT's limits, succeed. - * - * @throws Exception - */ - public void testTasksWithNoLimits() - throws Exception { - // Run the test only if memory management is enabled - if (!isProcfsBasedTreeAvailable()) { - return; - } - - // Fairly large value for sleepJob to succeed - long ttLimit = 4 * 1024 * 1024 * 1024L; - // Start cluster with proper configuration. - JobConf fConf = new JobConf(); - - fConf.setClass( - TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY, - DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class); - fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY, - ttLimit); - fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY, ttLimit); - fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, ttLimit); - fConf.setLong( - TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0); - startCluster(fConf); - JobConf conf = new JobConf(); + startCluster(new JobConf()); + long PER_TASK_LIMIT = 1L; // Doesn't matter how low. + JobConf conf = miniMRCluster.createJobConf(); + conf.setMemoryForMapTask(PER_TASK_LIMIT); + conf.setMemoryForReduceTask(PER_TASK_LIMIT); runAndCheckSuccessfulJob(conf); } @@ -202,33 +159,25 @@ } // Large so that sleepjob goes through and fits total TT usage - long PER_TASK_LIMIT = 2 * 1024 * 1024 * 1024L; - long TASK_TRACKER_LIMIT = 4 * 1024 * 1024 * 1024L; + long PER_TASK_LIMIT = 2 * 1024L; // Start cluster with proper configuration. JobConf fConf = new JobConf(); - - fConf.setClass( - TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY, - DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class); - fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY, - TASK_TRACKER_LIMIT); - fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY, - TASK_TRACKER_LIMIT); - fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, - TASK_TRACKER_LIMIT); + fConf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, + 2 * 1024L); fConf.setLong( - TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0); - startCluster(fConf); - JobConf conf = new JobConf(); - conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT); + JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, + 2 * 1024L); + startCluster(new JobConf()); + + JobConf conf = new JobConf(miniMRCluster.createJobConf()); + conf.setMemoryForMapTask(PER_TASK_LIMIT); + conf.setMemoryForReduceTask(PER_TASK_LIMIT); runAndCheckSuccessfulJob(conf); - } /** - * Test for verifying that tasks that go beyond limits, though the cumulative - * usage is under TT's limits, get killed. + * Test for verifying that tasks that go beyond limits get killed. * * @throws Exception */ @@ -240,43 +189,32 @@ return; } - long PER_TASK_LIMIT = 444; // Low enough to kill off sleepJob tasks. - long TASK_TRACKER_LIMIT = 4 * 1024 * 1024 * 1024L; // Large so as to fit - // total usage + long PER_TASK_LIMIT = 1L; // Low enough to kill off sleepJob tasks. + Pattern taskOverLimitPattern = Pattern.compile(String.format(taskOverLimitPatternString, String - .valueOf(PER_TASK_LIMIT))); + .valueOf(PER_TASK_LIMIT*1024*1024L))); Matcher mat = null; // Start cluster with proper configuration. JobConf fConf = new JobConf(); - fConf.setClass( - TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY, - DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class); - fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY, - TASK_TRACKER_LIMIT); - fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY, - TASK_TRACKER_LIMIT); - fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, - TASK_TRACKER_LIMIT); - fConf.setLong( - TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0); // very small value, so that no task escapes to successful completion. fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval", String.valueOf(300)); + fConf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, + 2 * 1024); + fConf.setLong( + JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, + 2 * 1024); startCluster(fConf); // Set up job. - JobConf conf = new JobConf(); - conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT); - JobTracker jt = miniMRCluster.getJobTrackerRunner().getJobTracker(); - conf.set("mapred.job.tracker", jt.getJobTrackerMachine() + ":" - + jt.getTrackerPort()); - NameNode nn = miniDFSCluster.getNameNode(); - conf.set("fs.default.name", "hdfs://" - + nn.getNameNodeAddress().getHostName() + ":" - + nn.getNameNodeAddress().getPort()); + JobConf conf = new JobConf(miniMRCluster.createJobConf()); + conf.setMemoryForMapTask(PER_TASK_LIMIT); + conf.setMemoryForReduceTask(PER_TASK_LIMIT); + conf.setMaxMapAttempts(1); + conf.setMaxReduceAttempts(1); // Start the job. int ret = 0; @@ -334,48 +272,39 @@ } // Large enough for SleepJob Tasks. - long PER_TASK_LIMIT = 100000000000L; - // Very Limited TT. All tasks will be killed. - long TASK_TRACKER_LIMIT = 100L; - Pattern taskOverLimitPattern = - Pattern.compile(String.format(taskOverLimitPatternString, String - .valueOf(PER_TASK_LIMIT))); - Pattern trackerOverLimitPattern = - Pattern - .compile("Killing one of the least progress tasks - .*, as " - + "the cumulative memory usage of all the tasks on the TaskTracker" - + " exceeds virtual memory limit " + TASK_TRACKER_LIMIT + "."); - Matcher mat = null; + long PER_TASK_LIMIT = 100 * 1024L; // Start cluster with proper configuration. JobConf fConf = new JobConf(); - fConf.setClass( - TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY, - DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class); - fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY, - TASK_TRACKER_LIMIT); - fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY, - TASK_TRACKER_LIMIT); - fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, - TASK_TRACKER_LIMIT); + fConf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, + 1L); fConf.setLong( - TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0); + JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1L); + + // Because of the above, the total tt limit is 2mb + long TASK_TRACKER_LIMIT = 2 * 1024 * 1024L; + // very small value, so that no task escapes to successful completion. fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval", String.valueOf(300)); startCluster(fConf); + Pattern taskOverLimitPattern = + Pattern.compile(String.format(taskOverLimitPatternString, String + .valueOf(PER_TASK_LIMIT))); + + Pattern trackerOverLimitPattern = + Pattern + .compile("Killing one of the least progress tasks - .*, as " + + "the cumulative memory usage of all the tasks on the TaskTracker" + + " exceeds virtual memory limit " + TASK_TRACKER_LIMIT + "."); + Matcher mat = null; + // Set up job. - JobConf conf = new JobConf(); - conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT); - JobTracker jt = miniMRCluster.getJobTrackerRunner().getJobTracker(); - conf.set("mapred.job.tracker", jt.getJobTrackerMachine() + ":" - + jt.getTrackerPort()); - NameNode nn = miniDFSCluster.getNameNode(); - conf.set("fs.default.name", "hdfs://" - + nn.getNameNodeAddress().getHostName() + ":" - + nn.getNameNodeAddress().getPort()); + JobConf conf = new JobConf(miniMRCluster.createJobConf()); + conf.setMemoryForMapTask(PER_TASK_LIMIT); + conf.setMemoryForReduceTask(PER_TASK_LIMIT); JobClient jClient = new JobClient(conf); SleepJob sleepJob = new SleepJob(); @@ -385,10 +314,12 @@ job.submit(); boolean TTOverFlowMsgPresent = false; while (true) { - // Set-up tasks are the first to be launched. - TaskReport[] setUpReports = jClient.getSetupTaskReports( - (org.apache.hadoop.mapred.JobID)job.getID()); - for (TaskReport tr : setUpReports) { + List allTaskReports = new ArrayList(); + allTaskReports.addAll(Arrays.asList(jClient + .getSetupTaskReports((org.apache.hadoop.mapred.JobID) job.getID()))); + allTaskReports.addAll(Arrays.asList(jClient + .getMapTaskReports((org.apache.hadoop.mapred.JobID) job.getID()))); + for (TaskReport tr : allTaskReports) { String[] diag = tr.getDiagnostics(); for (String str : diag) { mat = taskOverLimitPattern.matcher(str);