hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r689064 [5/5] - in /hadoop/core/trunk: ./ conf/ docs/ src/docs/src/documentation/content/xdocs/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Date Tue, 26 Aug 2008 13:07:45 GMT
Modified: hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml?rev=689064&r1=689063&r2=689064&view=diff
==============================================================================
--- hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml (original)
+++ hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml Tue Aug
26 06:07:43 2008
@@ -1082,7 +1082,9 @@
         </p>
         
         <p>Users/admins can also specify the maximum virtual memory 
-        of the launched child-task using <code>mapred.child.ulimit</code>.
+        of the launched child-task, and any sub-process it launches 
+        recursively, using <code>mapred.child.ulimit</code>. Note that
+        the value set here is a per process limit.
         The value for <code>mapred.child.ulimit</code> should be specified 
         in kilo bytes (KB). And also the value must be greater than
         or equal to the -Xmx passed to JavaVM, else the VM might not start. 
@@ -1094,6 +1096,27 @@
         <a href="cluster_setup.html#Configuring+the+Environment+of+the+Hadoop+Daemons">
         cluster_setup.html </a></p>
         
+        <p>There are two additional parameters that influence virtual memory
+        limits for tasks run on a tasktracker. The parameter 
+        <code>mapred.tasktracker.maxmemory</code> is set by admins
+        to limit the total memory all tasks that it runs can use together. 
+        Setting this enables the parameter <code>mapred.task.maxmemory</code>
+        that can be used to specify the maximum virtual memory the entire 
+        process tree starting from the launched child-task requires. 
+        This is a cumulative limit of all processes in the process tree. 
+        By specifying this value, users can be assured that the system will 
+        run their tasks only on tasktrackers that have atleast this amount 
+        of free memory available. If at any time during task execution, this 
+        limit is exceeded, the task would be killed by the system. By default, 
+        any task would get a share of 
+        <code>mapred.tasktracker.maxmemory</code>, divided
+        equally among the number of slots. The user can thus verify if the
+        tasks need more memory than this, and specify it in 
+        <code>mapred.task.maxmemory</code>. Specifically, this value must be

+        greater than any value specified for a maximum heap-size
+        of the child jvm via <code>mapred.child.java.opts</code>, or a ulimit
+        value in <code>mapred.child.ulimit</code>. </p>
+        
         <p>The task tracker has local directory,
         <code> ${mapred.local.dir}/taskTracker/</code> to create localized
         cache and localized job. It can define multiple local directories 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java?rev=689064&r1=689063&r2=689064&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java Tue Aug 26 06:07:43
2008
@@ -105,6 +105,12 @@
   private static final Log LOG = LogFactory.getLog(JobConf.class);
 
   /**
+   * A value which if set for memory related configuration options,
+   * indicates that the options are turned off.
+   */
+  public static final long DISABLED_VIRTUAL_MEMORY_LIMIT = -1L;
+  
+  /**
    * Construct a map/reduce job configuration.
    */
   public JobConf() {}
@@ -1285,6 +1291,67 @@
     return get("job.local.dir");
   }
   
+  /**
+   * The maximum amount of virtual memory all tasks running on a
+   * tasktracker, including sub-processes they launch, can use.
+   *  
+   * This value is used to compute the amount of free memory 
+   * available for tasks. Any task scheduled on this tasktracker is 
+   * guaranteed and constrained to use a share of this amount. Any task 
+   * exceeding its share will be killed.
+   * 
+   * If set to {@link #DISABLED_VIRTUAL_MEMORY_LIMIT}, this functionality 
+   * is disabled.
+   * 
+   * @return maximum amount of virtual memory to divide among
+   * @see #getMaxVirtualMemoryForTask()
+   */
+  public long getMaxVirtualMemoryForTasks() {
+    return getLong("mapred.tasktracker.tasks.maxmemory", 
+                      DISABLED_VIRTUAL_MEMORY_LIMIT);
+  }
+  
+  /**
+   * Set the maximum amount of virtual memory all tasks running on a
+   * tasktracker, including sub-processes they launch, can use.
+   * 
+   * @param vmem maximum amount of virtual memory that can be used.
+   * @see #getMaxVirtualMemoryForTasks()
+   */
+  public void setMaxVirtualMemoryForTasks(long vmem) {
+    setLong("mapred.tasktracker.tasks.maxmemory", vmem);
+  }
+  
+  /**
+   * The maximum amount of memory any task of this job will use.
+   * 
+   * A task of this job will be scheduled on a tasktracker, only if the
+   * amount of free memory on the tasktracker is greater than 
+   * or equal to this value.
+   * 
+   * If set to {@link #DISABLED_VIRTUAL_MEMORY_LIMIT}, tasks are assured 
+   * a memory limit on the tasktracker equal to
+   * mapred.tasktracker.tasks.maxmemory/number of slots. If the value of
+   * mapred.tasktracker.tasks.maxmemory is set to -1, this value is 
+   * ignored.
+   * 
+   * @return The maximum amount of memory any task of this job will use.
+   * @see #getMaxVirtualMemoryForTasks()
+   */
+  public long getMaxVirtualMemoryForTask() {
+    return getLong("mapred.task.maxmemory", DISABLED_VIRTUAL_MEMORY_LIMIT);
+  }
+  
+  /**
+   * Set the maximum amount of memory any task of this job can use.
+   * 
+   * @param vmem Maximum amount of memory any task of this job can use.
+   * @see #getMaxVirtualMemoryForTask()
+   */
+  public void setMaxVirtualMemoryForTask(long vmem) {
+    setLong("mapred.task.maxmemory", vmem);
+  }
+    
   /** 
    * Find a jar that contains a class of the same name, if any.
    * It will return a jar file, even if that is not the first thing

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=689064&r1=689063&r2=689064&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Tue Aug 26 06:07:43
2008
@@ -133,7 +133,8 @@
   private boolean hasSpeculativeMaps;
   private boolean hasSpeculativeReduces;
   private long inputLength = 0;
-
+  private long maxVirtualMemoryForTask;
+  
   // Per-job counters
   public static enum Counter { 
     NUM_FAILED_MAPS, 
@@ -225,6 +226,7 @@
     this.nonRunningReduces = new LinkedList<TaskInProgress>();    
     this.runningReduces = new LinkedHashSet<TaskInProgress>();
     this.resourceEstimator = new ResourceEstimator(this);
+    this.maxVirtualMemoryForTask = conf.getMaxVirtualMemoryForTask();
   }
 
   /**
@@ -441,6 +443,11 @@
       this.priority = priority;
     }
   }
+
+  // Accessors for resources.
+  public long getMaxVirtualMemoryForTask() {
+    return maxVirtualMemoryForTask;
+  }
   
   long getInputLength() {
     return inputLength;
@@ -1089,9 +1096,10 @@
     
 
     long outSize = resourceEstimator.getEstimatedMapOutputSize();
-    if(tts.getAvailableSpace() < outSize) {
+    long availSpace = tts.getResourceStatus().getAvailableSpace();
+    if(availSpace < outSize) {
       LOG.warn("No room for map task. Node " + node + 
-               " has " + tts.getAvailableSpace() + 
+               " has " + availSpace + 
                " bytes free; but we expect map to take " + outSize);
 
       return -1; //see if a different TIP might work better. 
@@ -1295,9 +1303,10 @@
     }
 
     long outSize = resourceEstimator.getEstimatedReduceInputSize();
-    if(tts.getAvailableSpace() < outSize) {
+    long availSpace = tts.getResourceStatus().getAvailableSpace();
+    if(availSpace < outSize) {
       LOG.warn("No room for reduce task. Node " + taskTracker + " has " +
-               tts.getAvailableSpace() + 
+                availSpace + 
                " bytes free; but we expect reduce input to take " + outSize);
 
       return -1; //see if a different TIP might work better. 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=689064&r1=689063&r2=689064&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Tue Aug 26 06:07:43
2008
@@ -1653,7 +1653,15 @@
     TaskInProgress tip = getTip(tipid);
     return (tip == null ? null : tip.getCounters());
   }
-    
+
+  /**
+   * Returns the configured task scheduler for this job tracker.
+   * @return the configured task scheduler
+   */
+  TaskScheduler getTaskScheduler() {
+    return taskScheduler;
+  }
+  
   /**
    * Returns specified TaskInProgress, or null.
    */

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=689064&r1=689063&r2=689064&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Tue Aug 26 06:07:43
2008
@@ -177,6 +177,11 @@
   private MapEventsFetcherThread mapEventsFetcher;
   int workerThreads;
   private CleanupQueue directoryCleanupThread;
+  private long maxVirtualMemoryForTasks 
+                                    = JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT;
+  private long defaultMemoryPerTask = JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT;
+  
+  
   /**
    * the minimum interval between jobtracker polls
    */
@@ -432,6 +437,13 @@
                              "Map-events fetcher for all reduce tasks " + "on " + 
                              taskTrackerName);
     mapEventsFetcher.start();
+    maxVirtualMemoryForTasks = fConf.getMaxVirtualMemoryForTasks();
+    if (maxVirtualMemoryForTasks != 
+                JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT) {
+      defaultMemoryPerTask = maxVirtualMemoryForTasks /
+                                    (maxCurrentMapTasks + 
+                                        maxCurrentReduceTasks);
+    }
     this.running = true;
   }
   
@@ -704,7 +716,11 @@
     }
     launchTaskForJob(tip, new JobConf(rjob.jobFile)); 
   }
-    
+
+  private long getDefaultMemoryPerTask() {
+    return defaultMemoryPerTask;
+  }
+
   private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws IOException{
     synchronized (tip) {
       try {
@@ -1034,7 +1050,12 @@
     if (askForNewTask) {
       checkLocalDirs(fConf.getLocalDirs());
       askForNewTask = enoughFreeSpace(localMinSpaceStart);
-      status.setAvailableSpace( getFreeSpace() );
+      status.getResourceStatus().setAvailableSpace( getFreeSpace() );
+      long freeVirtualMem = findFreeVirtualMemory();
+      LOG.debug("Setting amount of free virtual memory for the new task: " +
+                    freeVirtualMem);
+      status.getResourceStatus().setFreeVirtualMemory(freeVirtualMem);
+      status.getResourceStatus().setDefaultVirtualMemoryPerTask(getDefaultMemoryPerTask());
     
     }
       
     //
@@ -1080,6 +1101,68 @@
   }
 
   /**
+   * Return the maximum amount of memory available for all tasks on 
+   * this tracker
+   * @return maximum amount of virtual memory
+   */
+  long getMaxVirtualMemoryForTasks() {
+    return maxVirtualMemoryForTasks;
+  }
+  
+  /**
+   * Find the minimum amount of virtual memory that would be
+   * available for a new task.
+   * 
+   * The minimum amount of virtual memory is computed by looking
+   * at the maximum amount of virtual memory that is allowed for
+   * all tasks in the system, as per mapred.tasktracker.tasks.maxmemory,
+   * and the total amount of maximum virtual memory that can be
+   * used by all currently running tasks.
+   * 
+   * @return amount of free virtual memory that can be assured for
+   * new tasks
+   */
+  private synchronized long findFreeVirtualMemory() {
+  
+    if (maxVirtualMemoryForTasks == JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT) {
+      // this will disable picking up tasks based on free memory.
+      return JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT;
+    }
+  
+    long maxMemoryUsed = 0L;
+    for (TaskInProgress tip: runningTasks.values()) {
+      // the following task states are one in which the slot is
+      // still occupied and hence memory of the task should be
+      // accounted in used memory.
+      if ((tip.getRunState() == TaskStatus.State.RUNNING)
+            || (tip.getRunState() == TaskStatus.State.UNASSIGNED)
+            || (tip.getRunState() == TaskStatus.State.COMMIT_PENDING)) {
+        maxMemoryUsed += getMemoryForTask(tip);
+      }
+    }
+  
+    return (maxVirtualMemoryForTasks - maxMemoryUsed);
+  }
+
+  /**
+   * Return the memory allocated for a TIP.
+   * 
+   * If the TIP's job has a configured value for the max memory that is
+   * returned. Else, the default memory that would be assigned for the
+   * task is returned.
+   * @param tip The TaskInProgress
+   * @return the memory allocated for the TIP.
+   */
+  private long getMemoryForTask(TaskInProgress tip) {
+    long memForTask = tip.getJobConf().getMaxVirtualMemoryForTask();
+    if (memForTask == JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT) {
+      memForTask = this.getDefaultMemoryPerTask();
+    }
+    return memForTask;
+  }  
+  
+  
+  /**
    * Check if the jobtracker directed a 'reset' of the tasktracker.
    * 
    * @param actions the directives of the jobtracker for the tasktracker.

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=689064&r1=689063&r2=689064&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Tue Aug 26
06:07:43 2008
@@ -48,13 +48,98 @@
   volatile long lastSeen;
   private int maxMapTasks;
   private int maxReduceTasks;
-  long availableSpace; //space available on this node
+   
+  /**
+   * Class representing a collection of resources on this tasktracker.
+   */
+  static class ResourceStatus implements Writable {
+    
+    private long freeVirtualMemory;
+    private long defaultVirtualMemoryPerTask;
+    private long availableSpace;
+    
+    ResourceStatus() {
+      freeVirtualMemory = JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT;
+      defaultVirtualMemoryPerTask = JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT;
+      availableSpace = Long.MAX_VALUE;
+    }
+    
+    /**
+     * Set the amount of free virtual memory that is available for running
+     * a new task
+     * @param freeVMem amount of free virtual memory
+     */
+    void setFreeVirtualMemory(long freeVmem) {
+      freeVirtualMemory = freeVmem;
+    }
+
+    /**
+     * Get the amount of free virtual memory that will be available for
+     * running a new task. 
+     * 
+     * If this is {@link JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT}, it should 
+     * be ignored and not used in computation.
+     * 
+     *@return amount of free virtual memory.
+     */
+    long getFreeVirtualMemory() {
+      return freeVirtualMemory;
+    }
+
+    /**
+     * Set the default amount of virtual memory per task.
+     * @param vmem amount of free virtual memory.
+     */
+    void setDefaultVirtualMemoryPerTask(long defaultVmem) {
+      defaultVirtualMemoryPerTask = defaultVmem;
+    }
+    
+    /**
+     * Get the default amount of virtual memory per task.
+     * 
+     * This amount will be returned if a task's job does not specify any
+     * virtual memory itself. If this is 
+     * {@link JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT}, it should be ignored 
+     * and not used in any computation.
+     * 
+     * @return default amount of virtual memory per task. 
+     */    
+    long getDefaultVirtualMemoryPerTask() {
+      return defaultVirtualMemoryPerTask;
+    }
+    
+    void setAvailableSpace(long availSpace) {
+      availableSpace = availSpace;
+    }
     
+    /**
+     * Will return LONG_MAX if space hasn't been measured yet.
+     * @return bytes of available local disk space on this tasktracker.
+     */    
+    long getAvailableSpace() {
+      return availableSpace;
+    }
+    
+    public void write(DataOutput out) throws IOException {
+      WritableUtils.writeVLong(out, freeVirtualMemory);
+      WritableUtils.writeVLong(out, defaultVirtualMemoryPerTask);
+      WritableUtils.writeVLong(out, availableSpace);
+    }
+    
+    public void readFields(DataInput in) throws IOException {
+      freeVirtualMemory = WritableUtils.readVLong(in);;
+      defaultVirtualMemoryPerTask = WritableUtils.readVLong(in);;
+      availableSpace = WritableUtils.readVLong(in);;
+    }
+  }
+  
+  private ResourceStatus resStatus;
+  
   /**
    */
   public TaskTrackerStatus() {
     taskReports = new ArrayList<TaskStatus>();
-    this.availableSpace = Long.MAX_VALUE; //not measured by default.
+    resStatus = new ResourceStatus();
   }
 
   /**
@@ -71,7 +156,7 @@
     this.failures = failures;
     this.maxMapTasks = maxMapTasks;
     this.maxReduceTasks = maxReduceTasks;
-    this.availableSpace = Long.MAX_VALUE; //not measured by default.
+    this.resStatus = new ResourceStatus();
   }
 
   /**
@@ -171,18 +256,15 @@
   }  
   
   /**
-   * Will return LONG_MAX if space hasn't been measured yet.
-   * @return bytes of available local disk space on this tasktracker.
+   * Return the {@link ResourceStatus} object configured with this
+   * status.
+   * 
+   * @return the resource status
    */
-  public long getAvailableSpace() {
-    return availableSpace;
-  }
-  
-  public void setAvailableSpace(long a) {
-    availableSpace = a;
+  ResourceStatus getResourceStatus() {
+    return resStatus;
   }
   
-  
   ///////////////////////////////////////////
   // Writable
   ///////////////////////////////////////////
@@ -193,8 +275,9 @@
     out.writeInt(failures);
     out.writeInt(maxMapTasks);
     out.writeInt(maxReduceTasks);
+    resStatus.write(out);
     out.writeInt(taskReports.size());
-    out.writeLong(availableSpace);
+
     for (TaskStatus taskStatus : taskReports) {
       TaskStatus.writeTaskStatus(out, taskStatus);
     }
@@ -207,9 +290,10 @@
     this.failures = in.readInt();
     this.maxMapTasks = in.readInt();
     this.maxReduceTasks = in.readInt();
+    resStatus.readFields(in);
     taskReports.clear();
     int numTasks = in.readInt();
-    this.availableSpace = in.readLong();
+
     for (int i = 0; i < numTasks; i++) {
       taskReports.add(TaskStatus.readTaskStatus(in));
     }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=689064&r1=689063&r2=689064&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Tue Aug 26 06:07:43
2008
@@ -73,7 +73,11 @@
     public int getJobTrackerInfoPort() {
       return tracker.getInfoPort();
     }
-        
+  
+    public JobTracker getJobTracker() {
+      return tracker;
+    }
+    
     /**
      * Create the job tracker and run it.
      */
@@ -116,12 +120,18 @@
     volatile boolean isDead = false;
     int numDir;
 
-    TaskTrackerRunner(int trackerId, int numDir, String hostname) 
+    TaskTrackerRunner(int trackerId, int numDir, String hostname, 
+                                    JobConf cfg) 
     throws IOException {
       this.trackerId = trackerId;
       this.numDir = numDir;
       localDirs = new String[numDir];
-      JobConf conf = createJobConf();
+      JobConf conf = null;
+      if (cfg == null) {
+        conf = createJobConf();
+      } else {
+        conf = createJobConf(cfg);
+      }
       if (hostname != null) {
         conf.set("slave.host.name", hostname);
       }
@@ -216,6 +226,10 @@
             taskTrackerList.get(taskTracker)).getLocalDir();
   }
 
+  public JobTrackerRunner getJobTrackerRunner() {
+    return jobTracker;
+  }
+  
   /**
    * Get the number of task trackers in the cluster
    */
@@ -413,7 +427,7 @@
       }
       TaskTrackerRunner taskTracker;
       taskTracker = new TaskTrackerRunner(idx, numDir, 
-          hosts == null ? null : hosts[idx]);
+          hosts == null ? null : hosts[idx], conf);
       
       Thread taskTrackerThread = new Thread(taskTracker);
       taskTrackerList.add(taskTracker);

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestHighRAMJobs.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestHighRAMJobs.java?rev=689064&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestHighRAMJobs.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestHighRAMJobs.java Tue Aug 26 06:07:43
2008
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.util.ToolRunner;
+
+import junit.framework.TestCase;
+
+/**
+ * This test class tests the functionality related to configuring, reporting
+ * and computing memory related parameters in a Map/Reduce cluster.
+ * 
+ * Each test sets up a {@link MiniMRCluster} with a locally defined 
+ * {@link org.apache.hadoop.mapred.TaskScheduler}. This scheduler validates 
+ * the memory related configuration is correctly computed and reported from 
+ * the tasktracker in 
+ * {@link org.apache.hadoop.mapred.TaskScheduler.assignTasks()}.
+ *  
+ */
+public class TestHighRAMJobs extends TestCase {
+
+  private static final Log LOG = LogFactory.getLog(TestHighRAMJobs.class);
+
+  private static final String DEFAULT_SLEEP_JOB_MAP_COUNT = "1";
+  private static final String DEFAULT_SLEEP_JOB_REDUCE_COUNT = "1";
+  private static final String DEFAULT_MAP_SLEEP_TIME = "1000";
+  private static final String DEFAULT_REDUCE_SLEEP_TIME = "1000";
+  private static final long DISABLED_VIRTUAL_MEMORY_LIMIT = -1L;
+  
+  private MiniDFSCluster miniDFSCluster;
+  private MiniMRCluster miniMRCluster;
+  
+  public static class FakeTaskScheduler extends JobQueueTaskScheduler {
+    
+    private boolean hasPassed = true;
+    private String message;
+    private boolean isFirstTime = true;
+    
+    public FakeTaskScheduler() {
+      super();
+    }
+    
+    public boolean hasTestPassed() {
+      return hasPassed;
+    }
+    
+    public String getFailureMessage() {
+      return message;
+    }
+    
+    @Override
+    public List<Task> assignTasks(TaskTrackerStatus status) 
+                                          throws IOException {
+      TestHighRAMJobs.LOG.info("status = " + status.getResourceStatus().getFreeVirtualMemory());
+
+      long initialFreeMemory = getConf().getLong("initialFreeMemory", 0L);
+      long memoryPerTaskOnTT = getConf().getLong("memoryPerTaskOnTT", 0L);
+
+      if (isFirstTime) {
+        isFirstTime = false;
+        if (initialFreeMemory != status.getResourceStatus().getFreeVirtualMemory()) {
+          hasPassed = false;
+          message = "Initial memory expected = " + initialFreeMemory
+                      + " reported = " + status.getResourceStatus().getFreeVirtualMemory();
+        }
+        if (memoryPerTaskOnTT != status.getResourceStatus().getDefaultVirtualMemoryPerTask())
{
+          hasPassed = false;
+          message = "Memory per task on TT expected = " + memoryPerTaskOnTT
+                      + " reported = " 
+                      + status.getResourceStatus().getDefaultVirtualMemoryPerTask();
+        }
+      } else if (initialFreeMemory != DISABLED_VIRTUAL_MEMORY_LIMIT) {
+        
+        long memoryPerTask = memoryPerTaskOnTT; // by default
+        if (getConf().getLong("memoryPerTask", 0L) != 
+                                            DISABLED_VIRTUAL_MEMORY_LIMIT) {
+          memoryPerTask = getConf().getLong("memoryPerTask", 0L);
+        }
+          
+        long expectedFreeMemory = 0;
+        int runningTaskCount = status.countMapTasks() +
+                              status.countReduceTasks();
+        expectedFreeMemory = initialFreeMemory - 
+                                (memoryPerTask * runningTaskCount);
+
+        TestHighRAMJobs.LOG.info("expected free memory = " + 
+                                  expectedFreeMemory + ", reported = " + 
+                                  status.getResourceStatus().getFreeVirtualMemory());
+        if (expectedFreeMemory != status.getResourceStatus().getFreeVirtualMemory()) {
+          hasPassed = false;
+          message = "Expected free memory after " + runningTaskCount
+                      + " tasks are scheduled = " + expectedFreeMemory
+                      + ", reported = " + status.getResourceStatus().getFreeVirtualMemory();
+        }
+      }
+      return super.assignTasks(status);
+    }
+  }
+  
+  /* Test that verifies default values are configured and reported
+   * correctly.
+   */
+  public void testDefaultValuesForHighRAMJobs() throws Exception {
+    long defaultMemoryLimit = DISABLED_VIRTUAL_MEMORY_LIMIT;
+    try {
+      setUpCluster(defaultMemoryLimit, defaultMemoryLimit, 
+                    defaultMemoryLimit, null);
+      runJob(defaultMemoryLimit, DEFAULT_MAP_SLEEP_TIME, 
+          DEFAULT_REDUCE_SLEEP_TIME, DEFAULT_SLEEP_JOB_MAP_COUNT, 
+          DEFAULT_SLEEP_JOB_REDUCE_COUNT);
+      verifyTestResults();
+    } finally {
+      tearDownCluster();
+    }
+  }
+  
+  /* Test that verifies default value for memory per task on TT
+   * when the number of slots is non-default.
+   */
+  public void testDefaultMemoryPerTask() throws Exception {
+    long maxVmem = 1024*1024*1024L;
+    JobConf conf = new JobConf();
+    conf.setInt("mapred.tasktracker.map.tasks.maximum", 1);
+    conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
+    // change number of slots to 2.
+    long defaultMemPerTaskOnTT = maxVmem / 2;
+    try {
+      setUpCluster(maxVmem, defaultMemPerTaskOnTT, 
+                    DISABLED_VIRTUAL_MEMORY_LIMIT, conf);
+      runJob(DISABLED_VIRTUAL_MEMORY_LIMIT, DEFAULT_MAP_SLEEP_TIME,
+              DEFAULT_REDUCE_SLEEP_TIME, DEFAULT_SLEEP_JOB_MAP_COUNT,
+              DEFAULT_SLEEP_JOB_REDUCE_COUNT);
+      verifyTestResults();
+    } finally {
+      tearDownCluster();
+    }
+  }
+  
+  /* Test that verifies configured value for free memory is
+   * reported correctly. The test does NOT configure a value for
+   * memory per task. Hence, it also verifies that the default value
+   * per task on the TT is calculated correctly.
+   */
+  public void testConfiguredValueForFreeMemory() throws Exception {
+    long maxVmem = 1024*1024*1024L;
+    long defaultMemPerTaskOnTT = maxVmem/4; // 4 = default number of slots.
+    try {
+      setUpCluster(maxVmem, defaultMemPerTaskOnTT,
+                    DISABLED_VIRTUAL_MEMORY_LIMIT, null);
+      runJob(DISABLED_VIRTUAL_MEMORY_LIMIT, "10000",
+              DEFAULT_REDUCE_SLEEP_TIME, DEFAULT_SLEEP_JOB_MAP_COUNT,
+              DEFAULT_SLEEP_JOB_REDUCE_COUNT);
+      verifyTestResults();
+    } finally {
+      tearDownCluster();
+    }
+  }
+  
+  public void testHighRAMJob() throws Exception {
+    long maxVmem = 1024*1024*1024L;
+    long defaultMemPerTaskOnTT = maxVmem/4; // 4 = default number of slots.
+    /* Set a HIGH RAM requirement for a job. As 4 is the
+     * default number of slots, we set up the memory limit
+     * per task to be more than 25%. 
+     */
+    long maxVmemPerTask = maxVmem/3;
+    try {
+      setUpCluster(maxVmem, defaultMemPerTaskOnTT,
+                    maxVmemPerTask, null);
+      /* set up sleep limits higher, so the scheduler will see varying
+       * number of running tasks at a time. Also modify the number of
+       * map tasks so we test the iteration over more than one task.
+       */
+      runJob(maxVmemPerTask, "10000", "10000", "2", 
+                      DEFAULT_SLEEP_JOB_REDUCE_COUNT);
+      verifyTestResults();
+    } finally {
+      tearDownCluster();
+    }
+  }
+  
+  private void setUpCluster(long initialFreeMemory, long memoryPerTaskOnTT,
+                            long memoryPerTask, JobConf conf) 
+                              throws Exception {
+    if (conf == null) {
+      conf = new JobConf();
+    }
+    conf.setClass("mapred.jobtracker.taskScheduler", 
+        TestHighRAMJobs.FakeTaskScheduler.class,
+        TaskScheduler.class);
+    if (initialFreeMemory != -1L) {
+      conf.setMaxVirtualMemoryForTasks(initialFreeMemory);  
+    }
+    conf.setLong("initialFreeMemory", initialFreeMemory);
+    conf.setLong("memoryPerTaskOnTT", memoryPerTaskOnTT);
+    conf.setLong("memoryPerTask", memoryPerTask);
+    miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
+    FileSystem fileSys = miniDFSCluster.getFileSystem();
+    String namenode = fileSys.getUri().toString();
+    miniMRCluster = new MiniMRCluster(1, namenode, 3, 
+                      null, null, conf);    
+  }
+  
+  private void runJob(long memoryPerTask, String mapSleepTime,
+                        String reduceSleepTime, String mapTaskCount,
+                        String reduceTaskCount) 
+                                        throws Exception {
+    Configuration sleepJobConf = new Configuration();
+    sleepJobConf.set("mapred.job.tracker", "localhost:"
+                              + miniMRCluster.getJobTrackerPort());
+    if (memoryPerTask != -1L) {
+      sleepJobConf.setLong("mapred.task.maxmemory", memoryPerTask);
+    }
+    launchSleepJob(mapSleepTime, reduceSleepTime, 
+                    mapTaskCount, reduceTaskCount, sleepJobConf);    
+  }
+
+  private void launchSleepJob(String mapSleepTime, String reduceSleepTime,
+                              String mapTaskCount, String reduceTaskCount,
+                              Configuration conf) throws Exception {
+    String[] args = { "-m", mapTaskCount, "-r", reduceTaskCount,
+                      "-mt", mapSleepTime, "-rt", reduceSleepTime };
+    ToolRunner.run(conf, new SleepJob(), args);
+  }
+
+  private void verifyTestResults() {
+    FakeTaskScheduler scheduler = 
+      (FakeTaskScheduler)miniMRCluster.getJobTrackerRunner().
+                              getJobTracker().getTaskScheduler();
+    assertTrue(scheduler.getFailureMessage(), scheduler.hasTestPassed());
+  }
+  
+  private void tearDownCluster() {
+    if (miniMRCluster != null) { miniMRCluster.shutdown(); }
+    if (miniDFSCluster != null) { miniDFSCluster.shutdown(); }
+  }
+}



Mime
View raw message