hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ama...@apache.org
Subject svn commit: r1176934 [1/2] - in /hadoop/common/branches/branch-0.20-security-205: ./ src/core/org/apache/hadoop/util/ src/mapred/ src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapreduce/ src/test/org/apache/hadoop/mapred/ src/test/o...
Date Wed, 28 Sep 2011 15:53:41 GMT
Author: amarrk
Date: Wed Sep 28 15:53:40 2011
New Revision: 1176934

URL: http://svn.apache.org/viewvc?rev=1176934&view=rev
Log:
MAPREDUCE-2777. Backport of MAPREDUCE-220 and MAPREDUCE-2469. Includes adding cumulative CPU usage and total heap usage to task conters. (amarrk)

Added:
    hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/LinuxResourceCalculatorPlugin.java
    hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/ResourceCalculatorPlugin.java
    hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/mapred/TestTTResourceReporting.java
    hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/util/DummyResourceCalculatorPlugin.java
    hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/util/TestLinuxResourceCalculatorPlugin.java
Modified:
    hadoop/common/branches/branch-0.20-security-205/CHANGES.txt
    hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java
    hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/MemoryCalculatorPlugin.java
    hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java
    hadoop/common/branches/branch-0.20-security-205/src/mapred/mapred-default.xml
    hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
    hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
    hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
    hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapreduce/Counter.java
    hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/mapred/TestCounters.java
    hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/mapred/TestJobCounters.java
    hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/mapred/TestTTMemoryReporting.java
    hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java

Modified: hadoop/common/branches/branch-0.20-security-205/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/CHANGES.txt?rev=1176934&r1=1176933&r2=1176934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security-205/CHANGES.txt Wed Sep 28 15:53:40 2011
@@ -4,6 +4,10 @@ Release 0.20.205.0 - 2011.09.27
 
   NEW FEATURES
 
+    MAPREDUCE-2777. Backport of MAPREDUCE-220 and MAPREDUCE-2469. 
+    Includes adding cumulative CPU usage and total heap usage to task 
+    conters. (amarrk)
+
     HDFS-2202. Add a new DFSAdmin command to set balancer bandwidth of
     datanodes without restarting.  (Eric Payne via szetszwo)
 

Modified: hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java?rev=1176934&r1=1176933&r2=1176934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java (original)
+++ hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java Wed Sep 28 15:53:40 2011
@@ -18,115 +18,28 @@
 
 package org.apache.hadoop.util;
 
-import java.io.BufferedReader;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 /**
  * Plugin to calculate virtual and physical memories on Linux systems.
+ * @deprecated Use {@link org.apache.hadoop.util.LinuxResourceCalculatorPlugin}
+ *             instead
  */
+@Deprecated
 public class LinuxMemoryCalculatorPlugin extends MemoryCalculatorPlugin {
-  private static final Log LOG =
-      LogFactory.getLog(LinuxMemoryCalculatorPlugin.class);
-
-  /**
-   * proc's meminfo virtual file has keys-values in the format
-   * "key:[ \t]*value[ \t]kB".
-   */
-  private static final String PROCFS_MEMFILE = "/proc/meminfo";
-  private static final Pattern PROCFS_MEMFILE_FORMAT =
-      Pattern.compile("^([a-zA-Z]*):[ \t]*([0-9]*)[ \t]kB");
-
-  // We just need the values for the keys MemTotal and SwapTotal
-  private static final String MEMTOTAL_STRING = "MemTotal";
-  private static final String SWAPTOTAL_STRING = "SwapTotal";
-
-  private long ramSize = 0;
-  private long swapSize = 0;
-
-  boolean readMemInfoFile = false;
-
-  private void readProcMemInfoFile() {
-
-    if (readMemInfoFile) {
-      return;
-    }
-
-    // Read "/proc/memInfo" file
-    BufferedReader in = null;
-    FileReader fReader = null;
-    try {
-      fReader = new FileReader(PROCFS_MEMFILE);
-      in = new BufferedReader(fReader);
-    } catch (FileNotFoundException f) {
-      // shouldn't happen....
-      return;
-    }
-
-    Matcher mat = null;
-
-    try {
-      String str = in.readLine();
-      while (str != null) {
-        mat = PROCFS_MEMFILE_FORMAT.matcher(str);
-        if (mat.find()) {
-          if (mat.group(1).equals(MEMTOTAL_STRING)) {
-            ramSize = Long.parseLong(mat.group(2));
-          } else if (mat.group(1).equals(SWAPTOTAL_STRING)) {
-            swapSize = Long.parseLong(mat.group(2));
-          }
-        }
-        str = in.readLine();
-      }
-    } catch (IOException io) {
-      LOG.warn("Error reading the stream " + io);
-    } finally {
-      // Close the streams
-      try {
-        fReader.close();
-        try {
-          in.close();
-        } catch (IOException i) {
-          LOG.warn("Error closing the stream " + in);
-        }
-      } catch (IOException i) {
-        LOG.warn("Error closing the stream " + fReader);
-      }
-    }
-
-    readMemInfoFile = true;
+  private LinuxResourceCalculatorPlugin resourceCalculatorPlugin;
+  // Use everything from LinuxResourceCalculatorPlugin
+  public LinuxMemoryCalculatorPlugin() {
+    resourceCalculatorPlugin = new LinuxResourceCalculatorPlugin();
   }
 
   /** {@inheritDoc} */
   @Override
   public long getPhysicalMemorySize() {
-    readProcMemInfoFile();
-    return ramSize * 1024;
+    return resourceCalculatorPlugin.getPhysicalMemorySize();
   }
 
   /** {@inheritDoc} */
   @Override
   public long getVirtualMemorySize() {
-    readProcMemInfoFile();
-    return (ramSize + swapSize) * 1024;
-  }
-
-  /**
-   * Test the {@link LinuxMemoryCalculatorPlugin}
-   * 
-   * @param args
-   */
-  public static void main(String[] args) {
-    LinuxMemoryCalculatorPlugin plugin = new LinuxMemoryCalculatorPlugin();
-    System.out.println("Physical memory Size(bytes) : "
-        + plugin.getPhysicalMemorySize());
-    System.out.println("Total Virtual memory Size(bytes) : "
-        + plugin.getVirtualMemorySize());
+    return resourceCalculatorPlugin.getVirtualMemorySize();
   }
 }
\ No newline at end of file

Added: hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/LinuxResourceCalculatorPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/LinuxResourceCalculatorPlugin.java?rev=1176934&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/LinuxResourceCalculatorPlugin.java (added)
+++ hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/LinuxResourceCalculatorPlugin.java Wed Sep 28 15:53:40 2011
@@ -0,0 +1,407 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Plugin to calculate resource information on Linux systems.
+ */
+public class LinuxResourceCalculatorPlugin extends ResourceCalculatorPlugin {
+  private static final Log LOG =
+      LogFactory.getLog(LinuxResourceCalculatorPlugin.class);
+
+  /**
+   * proc's meminfo virtual file has keys-values in the format
+   * "key:[ \t]*value[ \t]kB".
+   */
+  private static final String PROCFS_MEMFILE = "/proc/meminfo";
+  private static final Pattern PROCFS_MEMFILE_FORMAT =
+      Pattern.compile("^([a-zA-Z]*):[ \t]*([0-9]*)[ \t]kB");
+
+  // We need the values for the following keys in meminfo
+  private static final String MEMTOTAL_STRING = "MemTotal";
+  private static final String SWAPTOTAL_STRING = "SwapTotal";
+  private static final String MEMFREE_STRING = "MemFree";
+  private static final String SWAPFREE_STRING = "SwapFree";
+  private static final String INACTIVE_STRING = "Inactive";
+  private static final int UNAVAILABLE = -1;
+
+  /**
+   * Patterns for parsing /proc/cpuinfo
+   */
+  private static final String PROCFS_CPUINFO = "/proc/cpuinfo";
+  private static final Pattern PROCESSOR_FORMAT =
+      Pattern.compile("^processor[ \t]:[ \t]*([0-9]*)");
+  private static final Pattern FREQUENCY_FORMAT =
+      Pattern.compile("^cpu MHz[ \t]*:[ \t]*([0-9.]*)");
+
+  /**
+   * Pattern for parsing /proc/stat
+   */
+  private static final String PROCFS_STAT = "/proc/stat";
+  private static final Pattern CPU_TIME_FORMAT =
+    Pattern.compile("^cpu[ \t]*([0-9]*)" +
+    		            "[ \t]*([0-9]*)[ \t]*([0-9]*)[ \t].*");
+  
+  private String procfsMemFile;
+  private String procfsCpuFile;
+  private String procfsStatFile;
+  long jiffyLengthInMillis;
+
+  private long ramSize = 0;
+  private long swapSize = 0;
+  private long ramSizeFree = 0;  // free ram space on the machine (kB)
+  private long swapSizeFree = 0; // free swap space on the machine (kB)
+  private long inactiveSize = 0; // inactive cache memory (kB)
+  private int numProcessors = 0; // number of processors on the system
+  private long cpuFrequency = 0L; // CPU frequency on the system (kHz)
+  private long cumulativeCpuTime = 0L; // CPU used time since system is on (ms)
+  private long lastCumulativeCpuTime = 0L; // CPU used time read last time (ms)
+  // Unix timestamp while reading the CPU time (ms)
+  private float cpuUsage = UNAVAILABLE;
+  private long sampleTime = UNAVAILABLE;
+  private long lastSampleTime = UNAVAILABLE;
+  private ProcfsBasedProcessTree pTree = null;
+
+  boolean readMemInfoFile = false;
+  boolean readCpuInfoFile = false;
+  
+  /**
+   * Get current time
+   * @return Unix time stamp in millisecond
+   */
+  long getCurrentTime() {
+    return System.currentTimeMillis();
+  }
+  
+  public LinuxResourceCalculatorPlugin() {
+    procfsMemFile = PROCFS_MEMFILE;
+    procfsCpuFile = PROCFS_CPUINFO;
+    procfsStatFile = PROCFS_STAT;
+    jiffyLengthInMillis = ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS;
+    String pid = System.getenv().get("JVM_PID");
+    pTree = new ProcfsBasedProcessTree(pid);
+  }
+  
+  /**
+   * Constructor which allows assigning the /proc/ directories. This will be
+   * used only in unit tests
+   * @param procfsMemFile fake file for /proc/meminfo
+   * @param procfsCpuFile fake file for /proc/cpuinfo
+   * @param procfsStatFile fake file for /proc/stat
+   * @param jiffyLengthInMillis fake jiffy length value
+   */
+  public LinuxResourceCalculatorPlugin(String procfsMemFile,
+                                       String procfsCpuFile,
+                                       String procfsStatFile,
+                                       long jiffyLengthInMillis) {
+    this.procfsMemFile = procfsMemFile;
+    this.procfsCpuFile = procfsCpuFile;
+    this.procfsStatFile = procfsStatFile;
+    this.jiffyLengthInMillis = jiffyLengthInMillis;
+    String pid = System.getenv().get("JVM_PID");
+    pTree = new ProcfsBasedProcessTree(pid);
+  }
+
+  /**
+   * Read /proc/meminfo, parse and compute memory information only once
+   */
+  private void readProcMemInfoFile() {
+    readProcMemInfoFile(false);
+  }
+
+  /**
+   * Read /proc/meminfo, parse and compute memory information
+   * @param readAgain if false, read only on the first time
+   */
+  private void readProcMemInfoFile(boolean readAgain) {
+
+    if (readMemInfoFile && !readAgain) {
+      return;
+    }
+
+    // Read "/proc/memInfo" file
+    BufferedReader in = null;
+    FileReader fReader = null;
+    try {
+      fReader = new FileReader(procfsMemFile);
+      in = new BufferedReader(fReader);
+    } catch (FileNotFoundException f) {
+      // shouldn't happen....
+      return;
+    }
+
+    Matcher mat = null;
+
+    try {
+      String str = in.readLine();
+      while (str != null) {
+        mat = PROCFS_MEMFILE_FORMAT.matcher(str);
+        if (mat.find()) {
+          if (mat.group(1).equals(MEMTOTAL_STRING)) {
+            ramSize = Long.parseLong(mat.group(2));
+          } else if (mat.group(1).equals(SWAPTOTAL_STRING)) {
+            swapSize = Long.parseLong(mat.group(2));
+          } else if (mat.group(1).equals(MEMFREE_STRING)) {
+            ramSizeFree = Long.parseLong(mat.group(2));
+          } else if (mat.group(1).equals(SWAPFREE_STRING)) {
+            swapSizeFree = Long.parseLong(mat.group(2));
+          } else if (mat.group(1).equals(INACTIVE_STRING)) {
+            inactiveSize = Long.parseLong(mat.group(2));
+          }
+        }
+        str = in.readLine();
+      }
+    } catch (IOException io) {
+      LOG.warn("Error reading the stream " + io);
+    } finally {
+      // Close the streams
+      try {
+        fReader.close();
+        try {
+          in.close();
+        } catch (IOException i) {
+          LOG.warn("Error closing the stream " + in);
+        }
+      } catch (IOException i) {
+        LOG.warn("Error closing the stream " + fReader);
+      }
+    }
+
+    readMemInfoFile = true;
+  }
+
+  /**
+   * Read /proc/cpuinfo, parse and calculate CPU information
+   */
+  private void readProcCpuInfoFile() {
+    // This directory needs to be read only once
+    if (readCpuInfoFile) {
+      return;
+    }
+    // Read "/proc/cpuinfo" file
+    BufferedReader in = null;
+    FileReader fReader = null;
+    try {
+      fReader = new FileReader(procfsCpuFile);
+      in = new BufferedReader(fReader);
+    } catch (FileNotFoundException f) {
+      // shouldn't happen....
+      return;
+    }
+    Matcher mat = null;
+    try {
+      numProcessors = 0;
+      String str = in.readLine();
+      while (str != null) {
+        mat = PROCESSOR_FORMAT.matcher(str);
+        if (mat.find()) {
+          numProcessors++;
+        }
+        mat = FREQUENCY_FORMAT.matcher(str);
+        if (mat.find()) {
+          cpuFrequency = (long)(Double.parseDouble(mat.group(1)) * 1000); // kHz
+        }
+        str = in.readLine();
+      }
+    } catch (IOException io) {
+      LOG.warn("Error reading the stream " + io);
+    } finally {
+      // Close the streams
+      try {
+        fReader.close();
+        try {
+          in.close();
+        } catch (IOException i) {
+          LOG.warn("Error closing the stream " + in);
+        }
+      } catch (IOException i) {
+        LOG.warn("Error closing the stream " + fReader);
+      }
+    }
+    readCpuInfoFile = true;
+  }
+
+  /**
+   * Read /proc/stat file, parse and calculate cumulative CPU
+   */
+  private void readProcStatFile() {
+    // Read "/proc/stat" file
+    BufferedReader in = null;
+    FileReader fReader = null;
+    try {
+      fReader = new FileReader(procfsStatFile);
+      in = new BufferedReader(fReader);
+    } catch (FileNotFoundException f) {
+      // shouldn't happen....
+      return;
+    }
+
+    Matcher mat = null;
+    try {
+      String str = in.readLine();
+      while (str != null) {
+        mat = CPU_TIME_FORMAT.matcher(str);
+        if (mat.find()) {
+          long uTime = Long.parseLong(mat.group(1));
+          long nTime = Long.parseLong(mat.group(2));
+          long sTime = Long.parseLong(mat.group(3));
+          cumulativeCpuTime = uTime + nTime + sTime; // milliseconds
+          break;
+        }
+        str = in.readLine();
+      }
+      cumulativeCpuTime *= jiffyLengthInMillis;
+    } catch (IOException io) {
+      LOG.warn("Error reading the stream " + io);
+    } finally {
+      // Close the streams
+      try {
+        fReader.close();
+        try {
+          in.close();
+        } catch (IOException i) {
+          LOG.warn("Error closing the stream " + in);
+        }
+      } catch (IOException i) {
+        LOG.warn("Error closing the stream " + fReader);
+      }
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public long getPhysicalMemorySize() {
+    readProcMemInfoFile();
+    return ramSize * 1024;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public long getVirtualMemorySize() {
+    readProcMemInfoFile();
+    return (ramSize + swapSize) * 1024;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public long getAvailablePhysicalMemorySize() {
+    readProcMemInfoFile(true);
+    return (ramSizeFree + inactiveSize) * 1024;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public long getAvailableVirtualMemorySize() {
+    readProcMemInfoFile(true);
+    return (ramSizeFree + swapSizeFree + inactiveSize) * 1024;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public int getNumProcessors() {
+    readProcCpuInfoFile();
+    return numProcessors;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public long getCpuFrequency() {
+    readProcCpuInfoFile();
+    return cpuFrequency;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public long getCumulativeCpuTime() {
+    readProcStatFile();
+    return cumulativeCpuTime;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public float getCpuUsage() {
+    readProcStatFile();
+    sampleTime = getCurrentTime();
+    if (lastSampleTime == UNAVAILABLE ||
+        lastSampleTime > sampleTime) {
+      // lastSampleTime > sampleTime may happen when the system time is changed
+      lastSampleTime = sampleTime;
+      lastCumulativeCpuTime = cumulativeCpuTime;
+      return cpuUsage;
+    }
+    // When lastSampleTime is sufficiently old, update cpuUsage.
+    // Also take a sample of the current time and cumulative CPU time for the
+    // use of the next calculation.
+    final long MINIMUM_UPDATE_INTERVAL = 10 * jiffyLengthInMillis;
+    if (sampleTime > lastSampleTime + MINIMUM_UPDATE_INTERVAL) {
+	    cpuUsage = (float)(cumulativeCpuTime - lastCumulativeCpuTime) * 100F /
+	               ((float)(sampleTime - lastSampleTime) * getNumProcessors());
+	    lastSampleTime = sampleTime;
+      lastCumulativeCpuTime = cumulativeCpuTime;
+    }
+    return cpuUsage;
+  }
+
+  /**
+   * Test the {@link LinuxResourceCalculatorPlugin}
+   *
+   * @param args
+   */
+  public static void main(String[] args) {
+    LinuxResourceCalculatorPlugin plugin = new LinuxResourceCalculatorPlugin();
+    System.out.println("Physical memory Size (bytes) : "
+        + plugin.getPhysicalMemorySize());
+    System.out.println("Total Virtual memory Size (bytes) : "
+        + plugin.getVirtualMemorySize());
+    System.out.println("Available Physical memory Size (bytes) : "
+        + plugin.getAvailablePhysicalMemorySize());
+    System.out.println("Total Available Virtual memory Size (bytes) : "
+        + plugin.getAvailableVirtualMemorySize());
+    System.out.println("Number of Processors : " + plugin.getNumProcessors());
+    System.out.println("CPU frequency (kHz) : " + plugin.getCpuFrequency());
+    System.out.println("Cumulative CPU time (ms) : " +
+            plugin.getCumulativeCpuTime());
+    try {
+      // Sleep so we can compute the CPU usage
+      Thread.sleep(500L);
+    } catch (InterruptedException e) {
+      // do nothing
+    }
+    System.out.println("CPU usage % : " + plugin.getCpuUsage());
+  }
+
+  @Override
+  public ProcResourceValues getProcResourceValues() {
+    pTree = pTree.getProcessTree();
+    long cpuTime = pTree.getCumulativeCpuTime();
+    long pMem = pTree.getCumulativeRssmem();
+    long vMem = pTree.getCumulativeVmem();
+    return new ProcResourceValues(cpuTime, pMem, vMem);
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/MemoryCalculatorPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/MemoryCalculatorPlugin.java?rev=1176934&r1=1176933&r2=1176934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/MemoryCalculatorPlugin.java (original)
+++ hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/MemoryCalculatorPlugin.java Wed Sep 28 15:53:40 2011
@@ -24,7 +24,11 @@ import org.apache.hadoop.conf.Configured
 /**
  * Plugin to calculate virtual and physical memories on the system.
  * 
+ * @deprecated Use
+ *             {@link org.apache.hadoop.util.ResourceCalculatorPlugin}
+ *             instead
  */
+@Deprecated
 public abstract class MemoryCalculatorPlugin extends Configured {
 
   /**

Modified: hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java?rev=1176934&r1=1176933&r2=1176934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java (original)
+++ hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java Wed Sep 28 15:53:40 2011
@@ -33,7 +33,7 @@ import java.util.LinkedList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 
 /**
  * A Proc file-system based ProcessTree. Works only on Linux.
@@ -44,17 +44,49 @@ public class ProcfsBasedProcessTree exte
       .getLog(ProcfsBasedProcessTree.class);
 
   private static final String PROCFS = "/proc/";
-  private static final Pattern PROCFS_STAT_FILE_FORMAT = Pattern
-      .compile("^([0-9-]+)\\s([^\\s]+)\\s[^\\s]\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+\\s){16}([0-9]+)(\\s[0-9-]+){16}");
+  private static final Pattern PROCFS_STAT_FILE_FORMAT = Pattern .compile(
+    "^([0-9-]+)\\s([^\\s]+)\\s[^\\s]\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+)\\s" +
+    "([0-9-]+\\s){7}([0-9]+)\\s([0-9]+)\\s([0-9-]+\\s){7}([0-9]+)\\s([0-9]+)" +
+    "(\\s[0-9-]+){15}");
 
   static final String PROCFS_STAT_FILE = "stat";
   static final String PROCFS_CMDLINE_FILE = "cmdline";
+  public static final long PAGE_SIZE;
+  static {
+    ShellCommandExecutor shellExecutor =
+            new ShellCommandExecutor(new String[]{"getconf",  "PAGESIZE"});
+    long pageSize = -1;
+    try {
+      shellExecutor.execute();
+      pageSize = Long.parseLong(shellExecutor.getOutput().replace("\n", ""));
+    } catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
+    } finally {
+      PAGE_SIZE = pageSize;
+    }
+  }
+  public static final long JIFFY_LENGTH_IN_MILLIS; // in millisecond
+  static {
+    ShellCommandExecutor shellExecutor =
+            new ShellCommandExecutor(new String[]{"getconf",  "CLK_TCK"});
+    long jiffiesPerSecond = -1;
+    try {
+      shellExecutor.execute();
+      jiffiesPerSecond = Long.parseLong(shellExecutor.getOutput().replace("\n", ""));
+    } catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
+    } finally {
+      JIFFY_LENGTH_IN_MILLIS = jiffiesPerSecond != -1 ?
+                     Math.round(1000D / jiffiesPerSecond) : -1;
+    }
+  }
 
   // to enable testing, using this variable which can be configured
   // to a test directory.
   private String procfsDir;
   
   private Integer pid = -1;
+  private Long cpuTime = 0L;
 
   private Map<Integer, ProcessInfo> processTree = new HashMap<Integer, ProcessInfo>();
 
@@ -149,11 +181,12 @@ public class ProcfsBasedProcessTree exte
         pInfoQueue.addAll(pInfo.getChildren());
       }
 
-      // update age values.
+      // update age values and compute the number of jiffies since last update
       for (Map.Entry<Integer, ProcessInfo> procs : processTree.entrySet()) {
         ProcessInfo oldInfo = oldProcs.get(procs.getKey());
-        if (oldInfo != null) {
-          if (procs.getValue() != null) {
+        if (procs.getValue() != null) {
+          procs.getValue().updateJiffy(oldInfo);
+          if (oldInfo != null) {
             procs.getValue().updateAge(oldInfo);  
           }
         }
@@ -196,7 +229,7 @@ public class ProcfsBasedProcessTree exte
   }
 
   private static final String PROCESSTREE_DUMP_FORMAT =
-      "\t|- %d %d %d %d %s %d %s\n";
+      "\t|- %d %d %d %d %s %d %d %d %d %s\n";
 
   /**
    * Get a dump of the process-tree.
@@ -208,12 +241,14 @@ public class ProcfsBasedProcessTree exte
     StringBuilder ret = new StringBuilder();
     // The header.
     ret.append(String.format("\t|- PID PPID PGRPID SESSID CMD_NAME "
-        + "VMEM_USAGE(BYTES) FULL_CMD_LINE\n"));
+        + "USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) "
+        + "RSSMEM_USAGE(PAGES) FULL_CMD_LINE\n"));
     for (ProcessInfo p : processTree.values()) {
       if (p != null) {
         ret.append(String.format(PROCESSTREE_DUMP_FORMAT, p.getPid(), p
             .getPpid(), p.getPgrpId(), p.getSessionId(), p.getName(), p
-            .getVmem(), p.getCmdLine(procfsDir)));
+            .getUtime(), p.getStime(), p.getVmem(), p.getRssmemPage(), p
+            .getCmdLine(procfsDir)));
       }
     }
     return ret.toString();
@@ -231,6 +266,18 @@ public class ProcfsBasedProcessTree exte
   }
 
   /**
+   * Get the cumulative resident set size (rss) memory used by all the processes
+   * in the process-tree.
+   *
+   * @return cumulative rss memory used by the process-tree in bytes. return 0
+   *         if it cannot be calculated
+   */
+  public long getCumulativeRssmem() {
+    // include all processes.. all processes will be older than 0.
+    return getCumulativeRssmem(0);
+  }
+
+  /**
    * Get the cumulative virtual memory used by all the processes in the
    * process-tree that are older than the passed in age.
    * 
@@ -249,6 +296,50 @@ public class ProcfsBasedProcessTree exte
     return total;
   }
 
+  /**
+   * Get the cumulative resident set size (rss) memory used by all the processes
+   * in the process-tree that are older than the passed in age.
+   *
+   * @param olderThanAge processes above this age are included in the
+   *                      memory addition
+   * @return cumulative rss memory used by the process-tree in bytes,
+   *          for processes older than this age. return 0 if it cannot be
+   *          calculated
+   */
+  public long getCumulativeRssmem(int olderThanAge) {
+    if (PAGE_SIZE < 0) {
+      return 0;
+    }
+    long totalPages = 0;
+    for (ProcessInfo p : processTree.values()) {
+      if ((p != null) && (p.getAge() > olderThanAge)) {
+        totalPages += p.getRssmemPage();
+      }
+    }
+    return totalPages * PAGE_SIZE; // convert # pages to byte
+  }
+
+  /**
+   * Get the CPU time in millisecond used by all the processes in the
+   * process-tree since the process-tree created
+   *
+   * @return cumulative CPU time in millisecond since the process-tree created
+   *         return 0 if it cannot be calculated
+   */
+  public long getCumulativeCpuTime() {
+    if (JIFFY_LENGTH_IN_MILLIS < 0) {
+      return 0;
+    }
+    long incJiffies = 0;
+    for (ProcessInfo p : processTree.values()) {
+      if (p != null) {
+        incJiffies += p.dtime;
+      }
+    }
+    cpuTime += incJiffies * JIFFY_LENGTH_IN_MILLIS;
+    return cpuTime;
+  }
+
   private static Integer getValidPID(String pid) {
     Integer retPid = -1;
     try {
@@ -318,10 +409,11 @@ public class ProcfsBasedProcessTree exte
       Matcher m = PROCFS_STAT_FILE_FORMAT.matcher(str);
       boolean mat = m.find();
       if (mat) {
-        // Set ( name ) ( ppid ) ( pgrpId ) (session ) (vsize )
-        pinfo.updateProcessInfo(m.group(2), Integer.parseInt(m.group(3)), Integer
-            .parseInt(m.group(4)), Integer.parseInt(m.group(5)), Long
-            .parseLong(m.group(7)));
+        // Set (name) (ppid) (pgrpId) (session) (utime) (stime) (vsize) (rss)
+         pinfo.updateProcessInfo(m.group(2), Integer.parseInt(m.group(3)),
+                 Integer.parseInt(m.group(4)), Integer.parseInt(m.group(5)),
+                 Long.parseLong(m.group(7)), Long.parseLong(m.group(8)),
+                 Long.parseLong(m.group(10)), Long.parseLong(m.group(11)));
       }
     } catch (IOException io) {
       LOG.warn("Error reading the stream " + io);
@@ -368,8 +460,18 @@ public class ProcfsBasedProcessTree exte
     private Integer ppid; // parent process-id
     private Integer sessionId; // session-id
     private Long vmem; // virtual memory usage
+    private Long rssmemPage; // rss memory usage in # of pages
+    private Long utime = 0L; // # of jiffies in user mode
+    private Long stime = 0L; // # of jiffies in kernel mode
     // how many times has this process been seen alive
     private int age; 
+
+    // # of jiffies used since last update:
+    private Long dtime = 0L;
+    // dtime = (utime + stime) - (utimeOld + stimeOld)
+    // We need this to compute the cumulative CPU time
+    // because the subprocess may finish earlier than root process
+
     private List<ProcessInfo> children = new ArrayList<ProcessInfo>(); // list of children
 
     public ProcessInfo(int pid) {
@@ -402,17 +504,41 @@ public class ProcfsBasedProcessTree exte
       return vmem;
     }
 
+    public Long getUtime() {
+      return utime;
+    }
+
+    public Long getStime() {
+      return stime;
+    }
+
+    public Long getDtime() {
+      return dtime;
+    }
+
+    public Long getRssmemPage() { // get rss # of pages
+      return rssmemPage;
+    }
+
     public int getAge() {
       return age;
     }
     
     public void updateProcessInfo(String name, Integer ppid, Integer pgrpId,
-        Integer sessionId, Long vmem) {
+        Integer sessionId, Long utime, Long stime, Long vmem, Long rssmem) {
       this.name = name;
       this.ppid = ppid;
       this.pgrpId = pgrpId;
       this.sessionId = sessionId;
+      this.utime = utime;
+      this.stime = stime;
       this.vmem = vmem;
+      this.rssmemPage = rssmem;
+    }
+
+    public void updateJiffy(ProcessInfo oldInfo) {
+      this.dtime = (oldInfo == null ? this.utime + this.stime
+              : (this.utime + this.stime) - (oldInfo.utime + oldInfo.stime));
     }
 
     public void updateAge(ProcessInfo oldInfo) {

Added: hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/ResourceCalculatorPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/ResourceCalculatorPlugin.java?rev=1176934&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/ResourceCalculatorPlugin.java (added)
+++ hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/ResourceCalculatorPlugin.java Wed Sep 28 15:53:40 2011
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Plugin to calculate resource information on the system.
+ * 
+ */
+public abstract class ResourceCalculatorPlugin extends Configured {
+
+  /**
+   * Obtain the total size of the virtual memory present in the system.
+   * 
+   * @return virtual memory size in bytes.
+   */
+  public abstract long getVirtualMemorySize();
+
+  /**
+   * Obtain the total size of the physical memory present in the system.
+   * 
+   * @return physical memory size bytes.
+   */
+  public abstract long getPhysicalMemorySize();
+
+  /**
+   * Obtain the total size of the available virtual memory present
+   * in the system.
+   *
+   * @return available virtual memory size in bytes.
+   */
+  public abstract long getAvailableVirtualMemorySize();
+
+  /**
+   * Obtain the total size of the available physical memory present
+   * in the system.
+   *
+   * @return available physical memory size bytes.
+   */
+  public abstract long getAvailablePhysicalMemorySize();
+
+  /**
+   * Obtain the total number of processors present on the system.
+   *
+   * @return number of processors
+   */
+  public abstract int getNumProcessors();
+
+  /**
+   * Obtain the CPU frequency of on the system.
+   *
+   * @return CPU frequency in kHz
+   */
+  public abstract long getCpuFrequency();
+
+  /**
+   * Obtain the cumulative CPU time since the system is on.
+   *
+   * @return cumulative CPU time in milliseconds
+   */
+  public abstract long getCumulativeCpuTime();
+
+  /**
+   * Obtain the CPU usage % of the machine. Return -1 if it is unavailable
+   *
+   * @return CPU usage in %
+   */
+  public abstract float getCpuUsage();
+
+  /**
+   * Obtain resource status used by current process tree.
+   */
+  public abstract ProcResourceValues getProcResourceValues();
+
+  public static class ProcResourceValues {
+    private final long cumulativeCpuTime;
+    private final long physicalMemorySize;
+    private final long virtualMemorySize;
+    public ProcResourceValues(long cumulativeCpuTime, long physicalMemorySize,
+                              long virtualMemorySize) {
+      this.cumulativeCpuTime = cumulativeCpuTime;
+      this.physicalMemorySize = physicalMemorySize;
+      this.virtualMemorySize = virtualMemorySize;
+    }
+    /**
+     * Obtain the physical memory size used by current process tree.
+     * @return physical memory size in bytes.
+     */
+    public long getPhysicalMemorySize() {
+      return physicalMemorySize;
+    }
+
+    /**
+     * Obtain the virtual memory size used by a current process tree.
+     * @return virtual memory size in bytes.
+     */
+    public long getVirtualMemorySize() {
+      return virtualMemorySize;
+    }
+
+    /**
+     * Obtain the cumulative CPU time used by a current process tree.
+     * @return cumulative CPU time in milliseconds
+     */
+    public long getCumulativeCpuTime() {
+      return cumulativeCpuTime;
+    }
+  }
+
+  /**
+   * Get the ResourceCalculatorPlugin from the class name and configure it. If
+   * class name is null, this method will try and return a memory calculator
+   * plugin available for this system.
+   * 
+   * @param clazz class-name
+   * @param conf configure the plugin with this.
+   * @return ResourceCalculatorPlugin
+   */
+  public static ResourceCalculatorPlugin getResourceCalculatorPlugin(
+      Class<? extends ResourceCalculatorPlugin> clazz, Configuration conf) {
+
+    if (clazz != null) {
+      return ReflectionUtils.newInstance(clazz, conf);
+    }
+
+    // No class given, try a os specific class
+    try {
+      String osName = System.getProperty("os.name");
+      if (osName.startsWith("Linux")) {
+        return new LinuxResourceCalculatorPlugin();
+      }
+    } catch (SecurityException se) {
+      // Failed to get Operating System name.
+      return null;
+    }
+
+    // Not supported on this system.
+    return null;
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-205/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/mapred/mapred-default.xml?rev=1176934&r1=1176933&r2=1176934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-0.20-security-205/src/mapred/mapred-default.xml Wed Sep 28 15:53:40 2011
@@ -186,14 +186,14 @@
 -->
 
 <property>
-  <name>mapred.tasktracker.memory_calculator_plugin</name>
+  <name>mapred.tasktracker.resourcecalculatorplugin</name>
   <value></value>
   <description>
-   Name of the class whose instance will be used to query memory information
+   Name of the class whose instance will be used to query resource information
    on the tasktracker.
    
    The class must be an instance of 
-   org.apache.hadoop.util.MemoryCalculatorPlugin. If the value is null, the
+   org.apache.hadoop.util.ResourceCalculatorPlugin. If the value is null, the
    tasktracker attempts to use a class appropriate to the platform. 
    Currently, the only platform supported is Linux.
   </description>

Modified: hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=1176934&r1=1176933&r2=1176934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java Wed Sep 28 15:53:40 2011
@@ -69,8 +69,10 @@ interface InterTrackerProtocol extends V
    * Version 26: Added numRequiredSlots to TaskStatus for MAPREDUCE-516
    * Version 27: Adding node health status to TaskStatus for MAPREDUCE-211
    * Version 28: Adding user name to the serialized Task for use by TT.
+   * Version 29: Adding available memory and CPU usage information on TT to
+   *             TaskTrackerStatus for MAPREDUCE-1218
    */
-  public static final long versionID = 28L;
+  public static final long versionID = 29L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;

Modified: hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1176934&r1=1176933&r2=1176934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/JobTracker.java Wed Sep 28 15:53:40 2011
@@ -4468,9 +4468,9 @@ public class JobTracker implements MRCon
     }
   }
   
-  static final String MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY =
+  public static final String MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY =
       "mapred.cluster.map.memory.mb";
-  static final String MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY =
+  public static final String MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY =
       "mapred.cluster.reduce.memory.mb";
 
   static final String MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY =

Modified: hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1176934&r1=1176933&r2=1176934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/Task.java Wed Sep 28 15:53:40 2011
@@ -50,10 +50,11 @@ import org.apache.hadoop.io.serializer.S
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.ResourceCalculatorPlugin;
+import org.apache.hadoop.util.ResourceCalculatorPlugin.ProcResourceValues;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.fs.FSDataInputStream;
 
@@ -85,7 +86,11 @@ abstract public class Task implements Wr
     REDUCE_SKIPPED_GROUPS,
     REDUCE_SKIPPED_RECORDS,
     SPILLED_RECORDS,
-    SPLIT_RAW_BYTES
+    SPLIT_RAW_BYTES,
+    CPU_MILLISECONDS,
+    PHYSICAL_MEMORY_BYTES,
+    VIRTUAL_MEMORY_BYTES,
+    COMMITTED_HEAP_BYTES
   }
   
   /**
@@ -143,6 +148,9 @@ abstract public class Task implements Wr
   private Iterator<Long> currentRecIndexIterator = 
     skipRanges.skipRangeIterator();
   
+  private ResourceCalculatorPlugin resourceCalculator = null;
+  private long initCpuCumulativeTime = 0;
+
   protected JobConf conf;
   protected MapOutputFile mapOutputFile = new MapOutputFile();
   protected LocalDirAllocator lDirAlloc;
@@ -518,6 +526,16 @@ abstract public class Task implements Wr
       }
     }
     committer.setupTask(taskContext);
+    Class<? extends ResourceCalculatorPlugin> clazz =
+        conf.getClass(TaskTracker.TT_RESOURCE_CALCULATOR_PLUGIN,
+            null, ResourceCalculatorPlugin.class);
+    resourceCalculator = ResourceCalculatorPlugin
+            .getResourceCalculatorPlugin(clazz, conf);
+    LOG.info(" Using ResourceCalculatorPlugin : " + resourceCalculator);
+    if (resourceCalculator != null) {
+      initCpuCumulativeTime =
+        resourceCalculator.getProcResourceValues().getCumulativeCpuTime();
+    }
   }
   
   protected class TaskReporter 
@@ -698,6 +716,7 @@ abstract public class Task implements Wr
       }
     }
     public void stopCommunicationThread() throws InterruptedException {
+      // Updating resources specified in ResourceCalculatorPlugin
       if (pingThread != null) {
         synchronized(lock) {
           while(!done) {
@@ -776,6 +795,27 @@ abstract public class Task implements Wr
   private Map<String, FileSystemStatisticUpdater> statisticUpdaters =
      new HashMap<String, FileSystemStatisticUpdater>();
   
+  /**
+   * Update resource information counters
+   */
+   void updateResourceCounters() {
+     // Update generic resource counters
+     updateHeapUsageCounter();
+     
+     if (resourceCalculator == null) {
+       return;
+     }
+     ProcResourceValues res = resourceCalculator.getProcResourceValues();
+     long cpuTime = res.getCumulativeCpuTime();
+     long pMem = res.getPhysicalMemorySize();
+     long vMem = res.getVirtualMemorySize();
+     // Remove the CPU time consumed previously by JVM reuse
+     cpuTime -= initCpuCumulativeTime;
+     counters.findCounter(Counter.CPU_MILLISECONDS).setValue(cpuTime);
+     counters.findCounter(Counter.PHYSICAL_MEMORY_BYTES).setValue(pMem);
+     counters.findCounter(Counter.VIRTUAL_MEMORY_BYTES).setValue(vMem);
+   }
+  
   private synchronized void updateCounters() {
     for(Statistics stat: FileSystem.getAllStatistics()) {
       String uriScheme = stat.getScheme();
@@ -786,6 +826,19 @@ abstract public class Task implements Wr
       }
       updater.updateCounters();      
     }
+    // TODO Should CPU related counters be update only once i.e in the end
+    updateResourceCounters();
+  }
+
+  /**
+   * Updates the {@link TaskCounter#COMMITTED_HEAP_BYTES} counter to reflect the
+   * current total committed heap space usage of this JVM.
+   */
+  @SuppressWarnings("deprecation")
+  private void updateHeapUsageCounter() {
+    long currentHeapUsage = Runtime.getRuntime().totalMemory();
+    counters.findCounter(Counter.COMMITTED_HEAP_BYTES)
+            .setValue(currentHeapUsage);
   }
 
   public void done(TaskUmbilicalProtocol umbilical,

Modified: hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1176934&r1=1176933&r2=1176934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Wed Sep 28 15:53:40 2011
@@ -92,6 +92,7 @@ import org.apache.hadoop.security.author
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.MemoryCalculatorPlugin;
+import org.apache.hadoop.util.ResourceCalculatorPlugin;
 import org.apache.hadoop.util.ProcfsBasedProcessTree;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -347,11 +348,14 @@ public class TaskTracker implements MRCo
   private long mapSlotMemorySizeOnTT = JobConf.DISABLED_MEMORY_LIMIT;
   private long reduceSlotSizeMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
   private long totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT;
+  private ResourceCalculatorPlugin resourceCalculatorPlugin = null;
 
   private UserLogManager userLogManager;
 
   static final String MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY =
       "mapred.tasktracker.memory_calculator_plugin";
+  public static final String TT_RESOURCE_CALCULATOR_PLUGIN = 
+      "mapreduce.tasktracker.resourcecalculatorplugin";
 
   /**
    * the minimum interval between jobtracker polls
@@ -802,6 +806,12 @@ public class TaskTracker implements MRCo
                              taskTrackerName);
     mapEventsFetcher.start();
 
+    Class<? extends ResourceCalculatorPlugin> clazz =
+        fConf.getClass(TT_RESOURCE_CALCULATOR_PLUGIN,
+                       null, ResourceCalculatorPlugin.class);
+    resourceCalculatorPlugin = 
+      ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, fConf);
+    LOG.info(" Using ResourceCalculatorPlugin : " + resourceCalculatorPlugin);
     initializeMemoryManagement();
 
     getUserLogManager().clearOldUserLogs(fConf);
@@ -1739,6 +1749,12 @@ public class TaskTracker implements MRCo
       long freeDiskSpace = getFreeSpace();
       long totVmem = getTotalVirtualMemoryOnTT();
       long totPmem = getTotalPhysicalMemoryOnTT();
+      long availableVmem = getAvailableVirtualMemoryOnTT();
+      long availablePmem = getAvailablePhysicalMemoryOnTT();
+      long cumuCpuTime = getCumulativeCpuTimeOnTT();
+      long cpuFreq = getCpuFrequencyOnTT();
+      int numCpu = getNumProcessorsOnTT();
+      float cpuUsage = getCpuUsageOnTT();
 
       status.getResourceStatus().setAvailableSpace(freeDiskSpace);
       status.getResourceStatus().setTotalVirtualMemory(totVmem);
@@ -1747,6 +1763,12 @@ public class TaskTracker implements MRCo
           mapSlotMemorySizeOnTT);
       status.getResourceStatus().setReduceSlotMemorySizeOnTT(
           reduceSlotSizeMemoryOnTT);
+      status.getResourceStatus().setAvailableVirtualMemory(availableVmem); 
+      status.getResourceStatus().setAvailablePhysicalMemory(availablePmem);
+      status.getResourceStatus().setCumulativeCpuTime(cumuCpuTime);
+      status.getResourceStatus().setCpuFrequency(cpuFreq);
+      status.getResourceStatus().setNumProcessors(numCpu);
+      status.getResourceStatus().setCpuUsage(cpuUsage);
     }
     //add node health information
     
@@ -1819,6 +1841,80 @@ public class TaskTracker implements MRCo
     return totalPhysicalMemoryOnTT;
   }
 
+  /**
+   * Return the free virtual memory available on this TaskTracker.
+   * @return total size of free virtual memory.
+   */
+  long getAvailableVirtualMemoryOnTT() {
+    long availableVirtualMemoryOnTT = TaskTrackerStatus.UNAVAILABLE;
+    if (resourceCalculatorPlugin != null) {
+      availableVirtualMemoryOnTT =
+              resourceCalculatorPlugin.getAvailableVirtualMemorySize();
+    }
+    return availableVirtualMemoryOnTT;
+  }
+
+  /**
+   * Return the free physical memory available on this TaskTracker.
+   * @return total size of free physical memory in bytes
+   */
+  long getAvailablePhysicalMemoryOnTT() {
+    long availablePhysicalMemoryOnTT = TaskTrackerStatus.UNAVAILABLE;
+    if (resourceCalculatorPlugin != null) {
+      availablePhysicalMemoryOnTT =
+              resourceCalculatorPlugin.getAvailablePhysicalMemorySize();
+    }
+    return availablePhysicalMemoryOnTT;
+  }
+
+  /**
+   * Return the cumulative CPU used time on this TaskTracker since system is on
+   * @return cumulative CPU used time in millisecond
+   */
+  long getCumulativeCpuTimeOnTT() {
+    long cumulativeCpuTime = TaskTrackerStatus.UNAVAILABLE;
+    if (resourceCalculatorPlugin != null) {
+      cumulativeCpuTime = resourceCalculatorPlugin.getCumulativeCpuTime();
+    }
+    return cumulativeCpuTime;
+  }
+
+  /**
+   * Return the number of Processors on this TaskTracker
+   * @return number of processors
+   */
+  int getNumProcessorsOnTT() {
+    int numProcessors = TaskTrackerStatus.UNAVAILABLE;
+    if (resourceCalculatorPlugin != null) {
+      numProcessors = resourceCalculatorPlugin.getNumProcessors();
+    }
+    return numProcessors;
+  }
+
+  /**
+   * Return the CPU frequency of this TaskTracker
+   * @return CPU frequency in kHz
+   */
+  long getCpuFrequencyOnTT() {
+    long cpuFrequency = TaskTrackerStatus.UNAVAILABLE;
+    if (resourceCalculatorPlugin != null) {
+      cpuFrequency = resourceCalculatorPlugin.getCpuFrequency();
+    }
+    return cpuFrequency;
+  }
+
+  /**
+   * Return the CPU usage in % of this TaskTracker
+   * @return CPU usage in %
+   */
+  float getCpuUsageOnTT() {
+    float cpuUsage = TaskTrackerStatus.UNAVAILABLE;
+    if (resourceCalculatorPlugin != null) {
+      cpuUsage = resourceCalculatorPlugin.getCpuUsage();
+    }
+    return cpuUsage;
+  }
+  
   long getTotalMemoryAllottedForTasksOnTT() {
     return totalMemoryAllottedForTasks;
   }
@@ -3974,25 +4070,31 @@ public class TaskTracker implements MRCo
           JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY));
     }
 
-    Class<? extends MemoryCalculatorPlugin> clazz =
-        fConf.getClass(MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
-            null, MemoryCalculatorPlugin.class);
-    MemoryCalculatorPlugin memoryCalculatorPlugin =
-        MemoryCalculatorPlugin
-            .getMemoryCalculatorPlugin(clazz, fConf);
-    LOG.info(" Using MemoryCalculatorPlugin : " + memoryCalculatorPlugin);
-
-    if (memoryCalculatorPlugin != null) {
-      totalVirtualMemoryOnTT = memoryCalculatorPlugin.getVirtualMemorySize();
+    // Use TT_RESOURCE_CALCULATOR_PLUGIN if it is configured.
+    Class<? extends MemoryCalculatorPlugin> clazz = 
+      fConf.getClass(MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY, 
+                     null, MemoryCalculatorPlugin.class); 
+    MemoryCalculatorPlugin memoryCalculatorPlugin = 
+      (clazz == null 
+       ? null 
+       : MemoryCalculatorPlugin.getMemoryCalculatorPlugin(clazz, fConf)); 
+    if (memoryCalculatorPlugin != null || resourceCalculatorPlugin != null) {
+      totalVirtualMemoryOnTT = 
+        (memoryCalculatorPlugin == null 
+         ? resourceCalculatorPlugin.getVirtualMemorySize() 
+         : memoryCalculatorPlugin.getVirtualMemorySize());
       if (totalVirtualMemoryOnTT <= 0) {
         LOG.warn("TaskTracker's totalVmem could not be calculated. "
-            + "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT);
+                 + "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT);
         totalVirtualMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
       }
-      totalPhysicalMemoryOnTT = memoryCalculatorPlugin.getPhysicalMemorySize();
+      totalPhysicalMemoryOnTT = 
+        (memoryCalculatorPlugin == null 
+         ? resourceCalculatorPlugin.getPhysicalMemorySize() 
+         : memoryCalculatorPlugin.getPhysicalMemorySize());
       if (totalPhysicalMemoryOnTT <= 0) {
         LOG.warn("TaskTracker's totalPmem could not be calculated. "
-            + "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT);
+                 + "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT);
         totalPhysicalMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
       }
     }

Modified: hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=1176934&r1=1176933&r2=1176934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Wed Sep 28 15:53:40 2011
@@ -55,6 +55,7 @@ public class TaskTrackerStatus implement
   private int maxReduceTasks;
   private TaskTrackerHealthStatus healthStatus;
    
+  public static final int UNAVAILABLE = -1;
   /**
    * Class representing a collection of resources on this tasktracker.
    */
@@ -66,6 +67,13 @@ public class TaskTrackerStatus implement
     private long reduceSlotMemorySizeOnTT;
     private long availableSpace;
     
+    private long availableVirtualMemory = UNAVAILABLE; // in byte
+    private long availablePhysicalMemory = UNAVAILABLE; // in byte
+    private int numProcessors = UNAVAILABLE;
+    private long cumulativeCpuTime = UNAVAILABLE; // in millisecond
+    private long cpuFrequency = UNAVAILABLE; // in kHz
+    private float cpuUsage = UNAVAILABLE; // in %
+
     ResourceStatus() {
       totalVirtualMemory = JobConf.DISABLED_MEMORY_LIMIT;
       totalPhysicalMemory = JobConf.DISABLED_MEMORY_LIMIT;
@@ -172,21 +180,160 @@ public class TaskTrackerStatus implement
     long getAvailableSpace() {
       return availableSpace;
     }
+
+    /**
+     * Set the amount of available virtual memory on the tasktracker.
+     * If the input is not a valid number, it will be set to UNAVAILABLE
+     *
+     * @param vmem amount of available virtual memory on the tasktracker
+     *                    in bytes.
+     */
+    void setAvailableVirtualMemory(long availableMem) {
+      availableVirtualMemory = availableMem > 0 ?
+                               availableMem : UNAVAILABLE;
+    }
+
+    /**
+     * Get the amount of available virtual memory on the tasktracker.
+     * Will return UNAVAILABLE if it cannot be obtained
+     *
+     * @return the amount of available virtual memory on the tasktracker
+     *             in bytes.
+     */
+    long getAvailableVirtualMemory() {
+      return availableVirtualMemory;
+    }
+
+    /**
+     * Set the amount of available physical memory on the tasktracker.
+     * If the input is not a valid number, it will be set to UNAVAILABLE
+     *
+     * @param availableRAM amount of available physical memory on the
+     *                     tasktracker in bytes.
+     */
+    void setAvailablePhysicalMemory(long availableRAM) {
+      availablePhysicalMemory = availableRAM > 0 ?
+                                availableRAM : UNAVAILABLE;
+    }
+
+    /**
+     * Get the amount of available physical memory on the tasktracker.
+     * Will return UNAVAILABLE if it cannot be obtained
+     *
+     * @return amount of available physical memory on the tasktracker in bytes.
+     */
+    long getAvailablePhysicalMemory() {
+      return availablePhysicalMemory;
+    }
+
+    /**
+     * Set the CPU frequency of this TaskTracker
+     * If the input is not a valid number, it will be set to UNAVAILABLE
+     *
+     * @param cpuFrequency CPU frequency in kHz
+     */
+    public void setCpuFrequency(long cpuFrequency) {
+      this.cpuFrequency = cpuFrequency > 0 ?
+                          cpuFrequency : UNAVAILABLE;
+    }
+
+    /**
+     * Get the CPU frequency of this TaskTracker
+     * Will return UNAVAILABLE if it cannot be obtained
+     *
+     * @return CPU frequency in kHz
+     */
+    public long getCpuFrequency() {
+      return cpuFrequency;
+    }
+
+    /**
+     * Set the number of processors on this TaskTracker
+     * If the input is not a valid number, it will be set to UNAVAILABLE
+     *
+     * @param numProcessors number of processors
+     */
+    public void setNumProcessors(int numProcessors) {
+      this.numProcessors = numProcessors > 0 ?
+                           numProcessors : UNAVAILABLE;
+    }
+
+    /**
+     * Get the number of processors on this TaskTracker
+     * Will return UNAVAILABLE if it cannot be obtained
+     *
+     * @return number of processors
+     */
+    public int getNumProcessors() {
+      return numProcessors;
+    }
+
+    /**
+     * Set the cumulative CPU time on this TaskTracker since it is up
+     * It can be set to UNAVAILABLE if it is currently unavailable.
+     *
+     * @param cumulativeCpuTime Used CPU time in millisecond
+     */
+    public void setCumulativeCpuTime(long cumulativeCpuTime) {
+      this.cumulativeCpuTime = cumulativeCpuTime > 0 ?
+                               cumulativeCpuTime : UNAVAILABLE;
+    }
+
+    /**
+     * Get the cumulative CPU time on this TaskTracker since it is up
+     * Will return UNAVAILABLE if it cannot be obtained
+     *
+     * @return used CPU time in milliseconds
+     */
+    public long getCumulativeCpuTime() {
+      return cumulativeCpuTime;
+    }
+    
+    /**
+     * Set the CPU usage on this TaskTracker
+     * 
+     * @param cpuUsage CPU usage in %
+     */
+    public void setCpuUsage(float cpuUsage) {
+      this.cpuUsage = cpuUsage;
+    }
+
+    /**
+     * Get the CPU usage on this TaskTracker
+     * Will return UNAVAILABLE if it cannot be obtained
+     *
+     * @return CPU usage in %
+     */
+    public float getCpuUsage() {
+      return cpuUsage;
+    }
     
     public void write(DataOutput out) throws IOException {
       WritableUtils.writeVLong(out, totalVirtualMemory);
       WritableUtils.writeVLong(out, totalPhysicalMemory);
+      WritableUtils.writeVLong(out, availableVirtualMemory);
+      WritableUtils.writeVLong(out, availablePhysicalMemory);
       WritableUtils.writeVLong(out, mapSlotMemorySizeOnTT);
       WritableUtils.writeVLong(out, reduceSlotMemorySizeOnTT);
       WritableUtils.writeVLong(out, availableSpace);
+      WritableUtils.writeVLong(out, cumulativeCpuTime);
+      WritableUtils.writeVLong(out, cpuFrequency);
+      WritableUtils.writeVInt(out, numProcessors);
+      out.writeFloat(getCpuUsage());
     }
     
     public void readFields(DataInput in) throws IOException {
       totalVirtualMemory = WritableUtils.readVLong(in);
       totalPhysicalMemory = WritableUtils.readVLong(in);
+      availableVirtualMemory = WritableUtils.readVLong(in);
+      availablePhysicalMemory = WritableUtils.readVLong(in);
       mapSlotMemorySizeOnTT = WritableUtils.readVLong(in);
       reduceSlotMemorySizeOnTT = WritableUtils.readVLong(in);
       availableSpace = WritableUtils.readVLong(in);
+      cumulativeCpuTime = WritableUtils.readVLong(in);
+      cpuFrequency = WritableUtils.readVLong(in);
+      numProcessors = WritableUtils.readVInt(in);
+      setCpuUsage(in.readFloat());
     }
   }
   

Modified: hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties?rev=1176934&r1=1176933&r2=1176934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties (original)
+++ hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties Wed Sep 28 15:53:40 2011
@@ -17,4 +17,8 @@ REDUCE_OUTPUT_RECORDS.name=    Reduce ou
 REDUCE_SKIPPED_RECORDS.name=   Reduce skipped records
 REDUCE_SKIPPED_GROUPS.name=    Reduce skipped groups
 SPILLED_RECORDS.name=          Spilled Records
+COMMITTED_HEAP_BYTES.name=     Total committed heap usage (bytes)
+CPU_MILLISECONDS.name=         CPU time spent (ms)
+PHYSICAL_MEMORY_BYTES.name=    Physical memory (bytes) snapshot
+VIRTUAL_MEMORY_BYTES.name=     Virtual memory (bytes) snapshot
 

Modified: hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapreduce/Counter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapreduce/Counter.java?rev=1176934&r1=1176933&r2=1176934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapreduce/Counter.java (original)
+++ hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapreduce/Counter.java Wed Sep 28 15:53:40 2011
@@ -104,6 +104,14 @@ public class Counter implements Writable
   }
     
   /**
+   * Set this counter by the given value
+   * @param value the value to set
+   */
+  public synchronized void setValue(long value) {
+    this.value = value;
+  }
+
+  /**
    * Increment this counter by the given value
    * @param incr the value to increase this counter by
    */

Modified: hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/mapred/TestCounters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/mapred/TestCounters.java?rev=1176934&r1=1176933&r2=1176934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/mapred/TestCounters.java (original)
+++ hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/mapred/TestCounters.java Wed Sep 28 15:53:40 2011
@@ -20,6 +20,9 @@ package org.apache.hadoop.mapred;
 import junit.framework.TestCase;
 import java.io.IOException;
 import java.text.ParseException;
+import java.util.Random;
+
+import org.apache.hadoop.mapred.Counters.Counter;
 
 /**
  * TestCounters checks the sanity and recoverability of {@code Counters}
@@ -90,6 +93,33 @@ public class TestCounters extends TestCa
     }
   }
   
+  /**
+   * Verify counter value works
+   */
+  public void testCounterValue() {
+    final int NUMBER_TESTS = 100;
+    final int NUMBER_INC = 10;
+    final Random rand = new Random();
+    for (int i = 0; i < NUMBER_TESTS; i++) {
+      long initValue = rand.nextInt();
+      long expectedValue = initValue;
+      Counter counter = new Counter("foo", "bar", expectedValue);
+      assertEquals("Counter value is not initialized correctly",
+                   expectedValue, counter.getValue());
+      for (int j = 0; j < NUMBER_INC; j++) {
+        int incValue = rand.nextInt();
+        counter.increment(incValue);
+        expectedValue += incValue;
+        assertEquals("Counter value is not incremented correctly",
+                     expectedValue, counter.getValue());
+      }
+      expectedValue = rand.nextInt();
+      counter.setValue(expectedValue);
+      assertEquals("Counter value is not set correctly",
+                   expectedValue, counter.getValue());
+    }
+  }
+  
   public static void main(String[] args) throws IOException {
     new TestCounters().testCounters();
   }

Modified: hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/mapred/TestJobCounters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/mapred/TestJobCounters.java?rev=1176934&r1=1176933&r2=1176934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/mapred/TestJobCounters.java (original)
+++ hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/mapred/TestJobCounters.java Wed Sep 28 15:53:40 2011
@@ -23,12 +23,12 @@ import java.io.FileWriter;
 import java.io.Writer;
 import java.io.BufferedWriter;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 import java.util.StringTokenizer;
 
 import junit.framework.TestCase;
-import junit.extensions.TestSetup;
-import junit.framework.Test;
-import junit.framework.TestSuite;
 
 import static org.apache.hadoop.mapred.Task.Counter.SPILLED_RECORDS;
 import static org.apache.hadoop.mapred.Task.Counter.MAP_INPUT_RECORDS;
@@ -36,12 +36,17 @@ import static org.apache.hadoop.mapred.T
 import static org.apache.hadoop.mapred.Task.Counter.MAP_INPUT_BYTES;
 import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_BYTES;
 import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_MATERIALIZED_BYTES;
+import static org.apache.hadoop.mapred.Task.Counter.COMMITTED_HEAP_BYTES;
 
+import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
@@ -450,4 +455,249 @@ public class TestJobCounters extends Tes
       }
     }
   }
+  
+  /** 
+   * Increases the JVM's heap usage to the specified target value.
+   */
+  static class MemoryLoader {
+    private static final int DEFAULT_UNIT_LOAD_SIZE = 10 * 1024 * 1024; // 10mb
+    
+    // the target value to reach
+    private long targetValue;
+    // a list to hold the load objects
+    private List<String> loadObjects = new ArrayList<String>();
+    
+    MemoryLoader(long targetValue) {
+      this.targetValue = targetValue;
+    }
+    
+    /**
+     * Loads the memory to the target value.
+     */
+    void load() {
+      while (Runtime.getRuntime().totalMemory() < targetValue) {
+        System.out.println("Loading memory with " + DEFAULT_UNIT_LOAD_SIZE 
+                           + " characters. Current usage : " 
+                           + Runtime.getRuntime().totalMemory());
+        // load some objects in the memory
+        loadObjects.add(RandomStringUtils.random(DEFAULT_UNIT_LOAD_SIZE));
+
+        // sleep for 100ms
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException ie) {}
+      }
+    }
+  }
+
+  /**
+   * A mapper that increases the JVM's heap usage to a target value configured 
+   * via {@link MemoryLoaderMapper#TARGET_VALUE} using a {@link MemoryLoader}.
+   */
+  @SuppressWarnings({"deprecation", "unchecked"})
+  static class MemoryLoaderMapper 
+  extends MapReduceBase 
+  implements org.apache.hadoop.mapred.Mapper<WritableComparable, Writable, 
+                    WritableComparable, Writable> {
+    static final String TARGET_VALUE = "map.memory-loader.target-value";
+    
+    private static MemoryLoader loader = null;
+    
+    public void map(WritableComparable key, Writable val, 
+                    OutputCollector<WritableComparable, Writable> output,
+                    Reporter reporter)
+    throws IOException {
+      assertNotNull("Mapper not configured!", loader);
+      
+      // load the memory
+      loader.load();
+      
+      // work as identity mapper
+      output.collect(key, val);
+    }
+
+    public void configure(JobConf conf) {
+      loader = new MemoryLoader(conf.getLong(TARGET_VALUE, -1));
+    }
+  }
+
+  /** 
+   * A reducer that increases the JVM's heap usage to a target value configured 
+   * via {@link MemoryLoaderReducer#TARGET_VALUE} using a {@link MemoryLoader}.
+   */
+  @SuppressWarnings({"deprecation", "unchecked"})
+  static class MemoryLoaderReducer extends MapReduceBase 
+  implements org.apache.hadoop.mapred.Reducer<WritableComparable, Writable, 
+                     WritableComparable, Writable> {
+    static final String TARGET_VALUE = "reduce.memory-loader.target-value";
+    private static MemoryLoader loader = null;
+    
+    public void reduce(WritableComparable key, Iterator<Writable> val, 
+                       OutputCollector<WritableComparable, Writable> output,
+                       Reporter reporter)
+    throws IOException {
+      assertNotNull("Reducer not configured!", loader);
+      
+      // load the memory
+      loader.load();
+      
+      // work as identity reducer
+      output.collect(key, key);
+    }
+
+    public void configure(JobConf conf) {
+      loader = new MemoryLoader(conf.getLong(TARGET_VALUE, -1));
+    }
+  }
+
+  @SuppressWarnings("deprecation")
+  private long getTaskCounterUsage (JobClient client, JobID id, int numReports,
+                                    int taskId, boolean isMap) 
+  throws Exception {
+    TaskReport[] reports = null;
+    if (isMap) {
+      reports = client.getMapTaskReports(id);
+    } else {
+      reports = client.getReduceTaskReports(id);
+    }
+    
+    assertNotNull("No reports found for " + (isMap? "map" : "reduce") + " tasks" 
+                  + "' in job " + id, reports);
+    // make sure that the total number of reports match the expected
+    assertEquals("Mismatch in task id", numReports, reports.length);
+    
+    Counters counters = reports[taskId].getCounters();
+    
+    return counters.getCounter(COMMITTED_HEAP_BYTES);
+  }
+
+  // set up heap options, target value for memory loader and the output 
+  // directory before running the job
+  @SuppressWarnings("deprecation")
+  private static RunningJob runHeapUsageTestJob(JobConf conf, Path testRootDir,
+                              String heapOptions, long targetMapValue,
+                              long targetReduceValue, FileSystem fs, 
+                              JobClient client, Path inDir) 
+  throws IOException {
+    // define a job
+    JobConf jobConf = new JobConf(conf);
+    
+    // configure the jobs
+    jobConf.setNumMapTasks(1);
+    jobConf.setNumReduceTasks(1);
+    jobConf.setMapperClass(MemoryLoaderMapper.class);
+    jobConf.setReducerClass(MemoryLoaderReducer.class);
+    jobConf.setInputFormat(TextInputFormat.class);
+    jobConf.setOutputKeyClass(LongWritable.class);
+    jobConf.setOutputValueClass(Text.class);
+    jobConf.setMaxMapAttempts(1);
+    jobConf.setMaxReduceAttempts(1);
+    jobConf.set(JobConf.MAPRED_TASK_JAVA_OPTS, heapOptions);
+    
+    // set the targets
+    jobConf.setLong(MemoryLoaderMapper.TARGET_VALUE, targetMapValue);
+    jobConf.setLong(MemoryLoaderReducer.TARGET_VALUE, targetReduceValue);
+    
+    // set the input directory for the job
+    FileInputFormat.setInputPaths(jobConf, inDir);
+    
+    // define job output folder
+    Path outDir = new Path(testRootDir, "out");
+    fs.delete(outDir, true);
+    FileOutputFormat.setOutputPath(jobConf, outDir);
+    
+    // run the job
+    RunningJob job = client.submitJob(jobConf);
+    job.waitForCompletion();
+    JobID jobID = job.getID();
+    assertTrue("Job " + jobID + " failed!", job.isSuccessful());
+    
+    return job;
+  }
+
+  /**
+   * Tests {@link TaskCounter}'s {@link TaskCounter.COMMITTED_HEAP_BYTES}. 
+   * The test consists of running a low-memory job which consumes less heap 
+   * memory and then running a high-memory job which consumes more heap memory, 
+   * and then ensuring that COMMITTED_HEAP_BYTES of low-memory job is smaller 
+   * than that of the high-memory job.
+   * @throws IOException
+   */
+  @SuppressWarnings("deprecation")
+  public void testHeapUsageCounter() throws Exception {
+    JobConf conf = new JobConf();
+    // create a local filesystem handle
+    FileSystem fileSystem = FileSystem.getLocal(conf);
+    
+    // define test root directories
+    File rootDir =
+      new File(System.getProperty("test.build.data", "/tmp"));
+    File testRootDir = new File(rootDir, "testHeapUsageCounter");
+    // cleanup the test root directory
+    Path testRootDirPath = new Path(testRootDir.toString());
+    fileSystem.delete(testRootDirPath, true);
+    // set the current working directory
+    fileSystem.setWorkingDirectory(testRootDirPath);
+    
+    fileSystem.deleteOnExit(testRootDirPath);
+    
+    // create a mini cluster using the local file system
+    MiniMRCluster mrCluster = 
+      new MiniMRCluster(1, fileSystem.getUri().toString(), 1);
+    
+    try {
+      conf = mrCluster.createJobConf();
+      JobClient jobClient = new JobClient(conf);
+
+      // define job input
+      File file = new File(testRootDir, "in");
+      Path inDir = new Path(file.toString());
+      // create input data
+      createWordsFile(file);
+
+      // configure and run a low memory job which will run without loading the
+      // jvm's heap
+      RunningJob lowMemJob = 
+        runHeapUsageTestJob(conf, testRootDirPath, "-Xms32m -Xmx1G", 
+                            0, 0, fileSystem, jobClient, inDir);
+      JobID lowMemJobID = lowMemJob.getID();
+      long lowMemJobMapHeapUsage = getTaskCounterUsage(jobClient, lowMemJobID, 
+                                                       1, 0, true);
+      System.out.println("Job1 (low memory job) map task heap usage: " 
+                         + lowMemJobMapHeapUsage);
+      long lowMemJobReduceHeapUsage =
+        getTaskCounterUsage(jobClient, lowMemJobID, 1, 0, false);
+      System.out.println("Job1 (low memory job) reduce task heap usage: " 
+                         + lowMemJobReduceHeapUsage);
+
+      // configure and run a high memory job which will load the jvm's heap
+      RunningJob highMemJob = 
+        runHeapUsageTestJob(conf, testRootDirPath, "-Xms32m -Xmx1G", 
+                            lowMemJobMapHeapUsage + 256*1024*1024, 
+                            lowMemJobReduceHeapUsage + 256*1024*1024,
+                            fileSystem, jobClient, inDir);
+      JobID highMemJobID = highMemJob.getID();
+
+      long highMemJobMapHeapUsage = getTaskCounterUsage(jobClient, highMemJobID,
+                                                        1, 0, true);
+      System.out.println("Job2 (high memory job) map task heap usage: " 
+                         + highMemJobMapHeapUsage);
+      long highMemJobReduceHeapUsage =
+        getTaskCounterUsage(jobClient, highMemJobID, 1, 0, false);
+      System.out.println("Job2 (high memory job) reduce task heap usage: " 
+                         + highMemJobReduceHeapUsage);
+
+      assertTrue("Incorrect map heap usage reported by the map task", 
+                 lowMemJobMapHeapUsage < highMemJobMapHeapUsage);
+
+      assertTrue("Incorrect reduce heap usage reported by the reduce task", 
+                 lowMemJobReduceHeapUsage < highMemJobReduceHeapUsage);
+    } finally {
+      // shutdown the mr cluster
+      mrCluster.shutdown();
+      try {
+        fileSystem.delete(testRootDirPath, true);
+      } catch (IOException ioe) {} 
+    }
+  }
 }
\ No newline at end of file

Modified: hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/mapred/TestTTMemoryReporting.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/mapred/TestTTMemoryReporting.java?rev=1176934&r1=1176933&r2=1176934&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/mapred/TestTTMemoryReporting.java (original)
+++ hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/mapred/TestTTMemoryReporting.java Wed Sep 28 15:53:40 2011
@@ -1,230 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapred;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.examples.SleepJob;
-import org.apache.hadoop.util.LinuxMemoryCalculatorPlugin;
-import org.apache.hadoop.util.MemoryCalculatorPlugin;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
-
-import junit.framework.TestCase;
-
-/**
- * This test class tests the functionality related to configuring, reporting
- * and computing memory related parameters in a Map/Reduce cluster.
- * 
- * Each test sets up a {@link MiniMRCluster} with a locally defined 
- * {@link org.apache.hadoop.mapred.TaskScheduler}. This scheduler validates 
- * the memory related configuration is correctly computed and reported from 
- * the tasktracker in 
- * {@link org.apache.hadoop.mapred.TaskScheduler#assignTasks(TaskTrackerStatus)}.
- */
-public class TestTTMemoryReporting extends TestCase {
-
-  static final Log LOG = LogFactory.getLog(TestTTMemoryReporting.class);
-  
-  private MiniMRCluster miniMRCluster;
-
-  /**
-   * Fake scheduler to test the proper reporting of memory values by TT
-   */
-  public static class FakeTaskScheduler extends JobQueueTaskScheduler {
-    
-    private boolean hasPassed = true;
-    private String message;
-    
-    public FakeTaskScheduler() {
-      super();
-    }
-    
-    public boolean hasTestPassed() {
-      return hasPassed;
-    }
-    
-    public String getFailureMessage() {
-      return message;
-    }
-    
-    @Override
-    public List<Task> assignTasks(TaskTracker taskTracker)
-        throws IOException {
-      TaskTrackerStatus status = taskTracker.getStatus();
-      
-      long totalVirtualMemoryOnTT =
-          getConf().getLong("totalVmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
-      long totalPhysicalMemoryOnTT =
-          getConf().getLong("totalPmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
-      long mapSlotMemorySize =
-          getConf().getLong("mapSlotMemorySize", JobConf.DISABLED_MEMORY_LIMIT);
-      long reduceSlotMemorySize =
-          getConf()
-              .getLong("reduceSlotMemorySize", JobConf.DISABLED_MEMORY_LIMIT);
-
-      long reportedTotalVirtualMemoryOnTT =
-          status.getResourceStatus().getTotalVirtualMemory();
-      long reportedTotalPhysicalMemoryOnTT =
-          status.getResourceStatus().getTotalPhysicalMemory();
-      long reportedMapSlotMemorySize =
-          status.getResourceStatus().getMapSlotMemorySizeOnTT();
-      long reportedReduceSlotMemorySize =
-          status.getResourceStatus().getReduceSlotMemorySizeOnTT();
-
-      message =
-          "expected memory values : (totalVirtualMemoryOnTT, totalPhysicalMemoryOnTT, "
-              + "mapSlotMemSize, reduceSlotMemorySize) = ("
-              + totalVirtualMemoryOnTT + ", " + totalPhysicalMemoryOnTT + ","
-              + mapSlotMemorySize + "," + reduceSlotMemorySize + ")";
-      message +=
-          "\nreported memory values : (totalVirtualMemoryOnTT, totalPhysicalMemoryOnTT, "
-              + "reportedMapSlotMemorySize, reportedReduceSlotMemorySize) = ("
-              + reportedTotalVirtualMemoryOnTT
-              + ", "
-              + reportedTotalPhysicalMemoryOnTT
-              + ","
-              + reportedMapSlotMemorySize
-              + ","
-              + reportedReduceSlotMemorySize
-              + ")";
-      LOG.info(message);
-      if (totalVirtualMemoryOnTT != reportedTotalVirtualMemoryOnTT
-          || totalPhysicalMemoryOnTT != reportedTotalPhysicalMemoryOnTT
-          || mapSlotMemorySize != reportedMapSlotMemorySize
-          || reduceSlotMemorySize != reportedReduceSlotMemorySize) {
-        hasPassed = false;
-      }
-      return super.assignTasks(taskTracker);
-    }
-  }
-
-  /**
-   * Test that verifies default values are configured and reported correctly.
-   * 
-   * @throws Exception
-   */
-  public void testDefaultMemoryValues()
-      throws Exception {
-    JobConf conf = new JobConf();
-    try {
-      // Memory values are disabled by default.
-      conf.setClass(
-          org.apache.hadoop.mapred.TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
-          DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
-      setUpCluster(conf);
-      runSleepJob(miniMRCluster.createJobConf());
-      verifyTestResults();
-    } finally {
-      tearDownCluster();
-    }
-  }
-
-  /**
-   * Test that verifies that configured values are reported correctly.
-   * 
-   * @throws Exception
-   */
-  public void testConfiguredMemoryValues()
-      throws Exception {
-    JobConf conf = new JobConf();
-    conf.setLong("totalVmemOnTT", 4 * 1024 * 1024 * 1024L);
-    conf.setLong("totalPmemOnTT", 2 * 1024 * 1024 * 1024L);
-    conf.setLong("mapSlotMemorySize", 1 * 512L);
-    conf.setLong("reduceSlotMemorySize", 1 * 1024L);
-
-    conf.setClass(
-        org.apache.hadoop.mapred.TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
-        DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
-    conf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
-        4 * 1024 * 1024 * 1024L);
-    conf.setLong(DummyMemoryCalculatorPlugin.MAXPMEM_TESTING_PROPERTY,
-        2 * 1024 * 1024 * 1024L);
-    conf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
-        512L);
-    conf.setLong(
-        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1024L);
-    
-    try {
-      setUpCluster(conf);
-      JobConf jobConf = miniMRCluster.createJobConf();
-      jobConf.setMemoryForMapTask(1 * 1024L);
-      jobConf.setMemoryForReduceTask(2 * 1024L);
-      runSleepJob(jobConf);
-      verifyTestResults();
-    } finally {
-      tearDownCluster();
-    }
-  }
-
-  /**
-   * Test that verifies that total memory values are calculated and reported
-   * correctly.
-   * 
-   * @throws Exception
-   */
-  public void testMemoryValuesOnLinux()
-      throws Exception {
-    if (!System.getProperty("os.name").startsWith("Linux")) {
-      return;
-    }
-
-    JobConf conf = new JobConf();
-    LinuxMemoryCalculatorPlugin plugin = new LinuxMemoryCalculatorPlugin();
-    conf.setLong("totalVmemOnTT", plugin.getVirtualMemorySize());
-    conf.setLong("totalPmemOnTT", plugin.getPhysicalMemorySize());
-
-    try {
-      setUpCluster(conf);
-      runSleepJob(miniMRCluster.createJobConf());
-      verifyTestResults();
-    } finally {
-      tearDownCluster();
-    }
-  }
-
-  private void setUpCluster(JobConf conf)
-                                throws Exception {
-    conf.setClass("mapred.jobtracker.taskScheduler",
-        TestTTMemoryReporting.FakeTaskScheduler.class, TaskScheduler.class);
-    conf.set("mapred.job.tracker.handler.count", "1");
-    miniMRCluster = new MiniMRCluster(1, "file:///", 3, null, null, conf);
-  }
-  
-  private void runSleepJob(JobConf conf) throws Exception {
-    String[] args = { "-m", "1", "-r", "1",
-                      "-mt", "10", "-rt", "10" };
-    ToolRunner.run(conf, new SleepJob(), args);
-  }
-
-  private void verifyTestResults() {
-    FakeTaskScheduler scheduler = 
-      (FakeTaskScheduler)miniMRCluster.getJobTrackerRunner().
-                              getJobTracker().getTaskScheduler();
-    assertTrue(scheduler.getFailureMessage(), scheduler.hasTestPassed());
-  }
-  
-  private void tearDownCluster() {
-    if (miniMRCluster != null) {
-      miniMRCluster.shutdown();
-    }
-  }
-}



Mime
View raw message