hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject svn commit: r1179465 [1/3] - in /hadoop/common/branches/branch-0.20-security-205/src: core/org/apache/hadoop/util/ mapred/ mapred/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapreduce/ test/org/apache/hadoop/mapred/ test/org/apache/hadoop/util/
Date Wed, 05 Oct 2011 22:37:53 GMT
Author: mattf
Date: Wed Oct  5 22:37:52 2011
New Revision: 1179465

URL: http://svn.apache.org/viewvc?rev=1179465&view=rev
Log:
MAPREDUCE-2777. Merged back into 0.20-security-205, along with r1177389 to delete the now-empty test file TestTTMemoryReporting.

Added:
    hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/LinuxResourceCalculatorPlugin.java
    hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/ResourceCalculatorPlugin.java
    hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/TaskTracker.java.orig
    hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/mapred/TestTTResourceReporting.java
    hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/util/DummyResourceCalculatorPlugin.java
    hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/util/TestLinuxResourceCalculatorPlugin.java
Removed:
    hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/mapred/TestTTMemoryReporting.java
Modified:
    hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java
    hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/MemoryCalculatorPlugin.java
    hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java
    hadoop/common/branches/branch-0.20-security-205/src/mapred/mapred-default.xml
    hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
    hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
    hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
    hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapreduce/Counter.java
    hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/mapred/TestCounters.java
    hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/mapred/TestJobCounters.java
    hadoop/common/branches/branch-0.20-security-205/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java

Modified: hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java?rev=1179465&r1=1179464&r2=1179465&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java (original)
+++ hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java Wed Oct  5 22:37:52 2011
@@ -18,115 +18,28 @@
 
 package org.apache.hadoop.util;
 
-import java.io.BufferedReader;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 /**
  * Plugin to calculate virtual and physical memories on Linux systems.
+ * @deprecated Use {@link org.apache.hadoop.util.LinuxResourceCalculatorPlugin}
+ *             instead
  */
+@Deprecated
 public class LinuxMemoryCalculatorPlugin extends MemoryCalculatorPlugin {
-  private static final Log LOG =
-      LogFactory.getLog(LinuxMemoryCalculatorPlugin.class);
-
-  /**
-   * proc's meminfo virtual file has keys-values in the format
-   * "key:[ \t]*value[ \t]kB".
-   */
-  private static final String PROCFS_MEMFILE = "/proc/meminfo";
-  private static final Pattern PROCFS_MEMFILE_FORMAT =
-      Pattern.compile("^([a-zA-Z]*):[ \t]*([0-9]*)[ \t]kB");
-
-  // We just need the values for the keys MemTotal and SwapTotal
-  private static final String MEMTOTAL_STRING = "MemTotal";
-  private static final String SWAPTOTAL_STRING = "SwapTotal";
-
-  private long ramSize = 0;
-  private long swapSize = 0;
-
-  boolean readMemInfoFile = false;
-
-  private void readProcMemInfoFile() {
-
-    if (readMemInfoFile) {
-      return;
-    }
-
-    // Read "/proc/memInfo" file
-    BufferedReader in = null;
-    FileReader fReader = null;
-    try {
-      fReader = new FileReader(PROCFS_MEMFILE);
-      in = new BufferedReader(fReader);
-    } catch (FileNotFoundException f) {
-      // shouldn't happen....
-      return;
-    }
-
-    Matcher mat = null;
-
-    try {
-      String str = in.readLine();
-      while (str != null) {
-        mat = PROCFS_MEMFILE_FORMAT.matcher(str);
-        if (mat.find()) {
-          if (mat.group(1).equals(MEMTOTAL_STRING)) {
-            ramSize = Long.parseLong(mat.group(2));
-          } else if (mat.group(1).equals(SWAPTOTAL_STRING)) {
-            swapSize = Long.parseLong(mat.group(2));
-          }
-        }
-        str = in.readLine();
-      }
-    } catch (IOException io) {
-      LOG.warn("Error reading the stream " + io);
-    } finally {
-      // Close the streams
-      try {
-        fReader.close();
-        try {
-          in.close();
-        } catch (IOException i) {
-          LOG.warn("Error closing the stream " + in);
-        }
-      } catch (IOException i) {
-        LOG.warn("Error closing the stream " + fReader);
-      }
-    }
-
-    readMemInfoFile = true;
+  private LinuxResourceCalculatorPlugin resourceCalculatorPlugin;
+  // Use everything from LinuxResourceCalculatorPlugin
+  public LinuxMemoryCalculatorPlugin() {
+    resourceCalculatorPlugin = new LinuxResourceCalculatorPlugin();
   }
 
   /** {@inheritDoc} */
   @Override
   public long getPhysicalMemorySize() {
-    readProcMemInfoFile();
-    return ramSize * 1024;
+    return resourceCalculatorPlugin.getPhysicalMemorySize();
   }
 
   /** {@inheritDoc} */
   @Override
   public long getVirtualMemorySize() {
-    readProcMemInfoFile();
-    return (ramSize + swapSize) * 1024;
-  }
-
-  /**
-   * Test the {@link LinuxMemoryCalculatorPlugin}
-   * 
-   * @param args
-   */
-  public static void main(String[] args) {
-    LinuxMemoryCalculatorPlugin plugin = new LinuxMemoryCalculatorPlugin();
-    System.out.println("Physical memory Size(bytes) : "
-        + plugin.getPhysicalMemorySize());
-    System.out.println("Total Virtual memory Size(bytes) : "
-        + plugin.getVirtualMemorySize());
+    return resourceCalculatorPlugin.getVirtualMemorySize();
   }
 }
\ No newline at end of file

Added: hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/LinuxResourceCalculatorPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/LinuxResourceCalculatorPlugin.java?rev=1179465&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/LinuxResourceCalculatorPlugin.java (added)
+++ hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/LinuxResourceCalculatorPlugin.java Wed Oct  5 22:37:52 2011
@@ -0,0 +1,407 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Plugin to calculate resource information on Linux systems.
+ */
+public class LinuxResourceCalculatorPlugin extends ResourceCalculatorPlugin {
+  private static final Log LOG =
+      LogFactory.getLog(LinuxResourceCalculatorPlugin.class);
+
+  /**
+   * proc's meminfo virtual file has keys-values in the format
+   * "key:[ \t]*value[ \t]kB".
+   */
+  private static final String PROCFS_MEMFILE = "/proc/meminfo";
+  private static final Pattern PROCFS_MEMFILE_FORMAT =
+      Pattern.compile("^([a-zA-Z]*):[ \t]*([0-9]*)[ \t]kB");
+
+  // We need the values for the following keys in meminfo
+  private static final String MEMTOTAL_STRING = "MemTotal";
+  private static final String SWAPTOTAL_STRING = "SwapTotal";
+  private static final String MEMFREE_STRING = "MemFree";
+  private static final String SWAPFREE_STRING = "SwapFree";
+  private static final String INACTIVE_STRING = "Inactive";
+  private static final int UNAVAILABLE = -1;
+
+  /**
+   * Patterns for parsing /proc/cpuinfo
+   */
+  private static final String PROCFS_CPUINFO = "/proc/cpuinfo";
+  private static final Pattern PROCESSOR_FORMAT =
+      Pattern.compile("^processor[ \t]:[ \t]*([0-9]*)");
+  private static final Pattern FREQUENCY_FORMAT =
+      Pattern.compile("^cpu MHz[ \t]*:[ \t]*([0-9.]*)");
+
+  /**
+   * Pattern for parsing /proc/stat
+   */
+  private static final String PROCFS_STAT = "/proc/stat";
+  private static final Pattern CPU_TIME_FORMAT =
+    Pattern.compile("^cpu[ \t]*([0-9]*)" +
+    		            "[ \t]*([0-9]*)[ \t]*([0-9]*)[ \t].*");
+  
+  private String procfsMemFile;
+  private String procfsCpuFile;
+  private String procfsStatFile;
+  long jiffyLengthInMillis;
+
+  private long ramSize = 0;
+  private long swapSize = 0;
+  private long ramSizeFree = 0;  // free ram space on the machine (kB)
+  private long swapSizeFree = 0; // free swap space on the machine (kB)
+  private long inactiveSize = 0; // inactive cache memory (kB)
+  private int numProcessors = 0; // number of processors on the system
+  private long cpuFrequency = 0L; // CPU frequency on the system (kHz)
+  private long cumulativeCpuTime = 0L; // CPU used time since system is on (ms)
+  private long lastCumulativeCpuTime = 0L; // CPU used time read last time (ms)
+  // Unix timestamp while reading the CPU time (ms)
+  private float cpuUsage = UNAVAILABLE;
+  private long sampleTime = UNAVAILABLE;
+  private long lastSampleTime = UNAVAILABLE;
+  private ProcfsBasedProcessTree pTree = null;
+
+  boolean readMemInfoFile = false;
+  boolean readCpuInfoFile = false;
+  
+  /**
+   * Get current time
+   * @return Unix time stamp in millisecond
+   */
+  long getCurrentTime() {
+    return System.currentTimeMillis();
+  }
+  
+  public LinuxResourceCalculatorPlugin() {
+    procfsMemFile = PROCFS_MEMFILE;
+    procfsCpuFile = PROCFS_CPUINFO;
+    procfsStatFile = PROCFS_STAT;
+    jiffyLengthInMillis = ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS;
+    String pid = System.getenv().get("JVM_PID");
+    pTree = new ProcfsBasedProcessTree(pid);
+  }
+  
+  /**
+   * Constructor which allows assigning the /proc/ directories. This will be
+   * used only in unit tests
+   * @param procfsMemFile fake file for /proc/meminfo
+   * @param procfsCpuFile fake file for /proc/cpuinfo
+   * @param procfsStatFile fake file for /proc/stat
+   * @param jiffyLengthInMillis fake jiffy length value
+   */
+  public LinuxResourceCalculatorPlugin(String procfsMemFile,
+                                       String procfsCpuFile,
+                                       String procfsStatFile,
+                                       long jiffyLengthInMillis) {
+    this.procfsMemFile = procfsMemFile;
+    this.procfsCpuFile = procfsCpuFile;
+    this.procfsStatFile = procfsStatFile;
+    this.jiffyLengthInMillis = jiffyLengthInMillis;
+    String pid = System.getenv().get("JVM_PID");
+    pTree = new ProcfsBasedProcessTree(pid);
+  }
+
+  /**
+   * Read /proc/meminfo, parse and compute memory information only once
+   */
+  private void readProcMemInfoFile() {
+    readProcMemInfoFile(false);
+  }
+
+  /**
+   * Read /proc/meminfo, parse and compute memory information
+   * @param readAgain if false, read only on the first time
+   */
+  private void readProcMemInfoFile(boolean readAgain) {
+
+    if (readMemInfoFile && !readAgain) {
+      return;
+    }
+
+    // Read "/proc/memInfo" file
+    BufferedReader in = null;
+    FileReader fReader = null;
+    try {
+      fReader = new FileReader(procfsMemFile);
+      in = new BufferedReader(fReader);
+    } catch (FileNotFoundException f) {
+      // shouldn't happen....
+      return;
+    }
+
+    Matcher mat = null;
+
+    try {
+      String str = in.readLine();
+      while (str != null) {
+        mat = PROCFS_MEMFILE_FORMAT.matcher(str);
+        if (mat.find()) {
+          if (mat.group(1).equals(MEMTOTAL_STRING)) {
+            ramSize = Long.parseLong(mat.group(2));
+          } else if (mat.group(1).equals(SWAPTOTAL_STRING)) {
+            swapSize = Long.parseLong(mat.group(2));
+          } else if (mat.group(1).equals(MEMFREE_STRING)) {
+            ramSizeFree = Long.parseLong(mat.group(2));
+          } else if (mat.group(1).equals(SWAPFREE_STRING)) {
+            swapSizeFree = Long.parseLong(mat.group(2));
+          } else if (mat.group(1).equals(INACTIVE_STRING)) {
+            inactiveSize = Long.parseLong(mat.group(2));
+          }
+        }
+        str = in.readLine();
+      }
+    } catch (IOException io) {
+      LOG.warn("Error reading the stream " + io);
+    } finally {
+      // Close the streams
+      try {
+        fReader.close();
+        try {
+          in.close();
+        } catch (IOException i) {
+          LOG.warn("Error closing the stream " + in);
+        }
+      } catch (IOException i) {
+        LOG.warn("Error closing the stream " + fReader);
+      }
+    }
+
+    readMemInfoFile = true;
+  }
+
+  /**
+   * Read /proc/cpuinfo, parse and calculate CPU information
+   */
+  private void readProcCpuInfoFile() {
+    // This directory needs to be read only once
+    if (readCpuInfoFile) {
+      return;
+    }
+    // Read "/proc/cpuinfo" file
+    BufferedReader in = null;
+    FileReader fReader = null;
+    try {
+      fReader = new FileReader(procfsCpuFile);
+      in = new BufferedReader(fReader);
+    } catch (FileNotFoundException f) {
+      // shouldn't happen....
+      return;
+    }
+    Matcher mat = null;
+    try {
+      numProcessors = 0;
+      String str = in.readLine();
+      while (str != null) {
+        mat = PROCESSOR_FORMAT.matcher(str);
+        if (mat.find()) {
+          numProcessors++;
+        }
+        mat = FREQUENCY_FORMAT.matcher(str);
+        if (mat.find()) {
+          cpuFrequency = (long)(Double.parseDouble(mat.group(1)) * 1000); // kHz
+        }
+        str = in.readLine();
+      }
+    } catch (IOException io) {
+      LOG.warn("Error reading the stream " + io);
+    } finally {
+      // Close the streams
+      try {
+        fReader.close();
+        try {
+          in.close();
+        } catch (IOException i) {
+          LOG.warn("Error closing the stream " + in);
+        }
+      } catch (IOException i) {
+        LOG.warn("Error closing the stream " + fReader);
+      }
+    }
+    readCpuInfoFile = true;
+  }
+
+  /**
+   * Read /proc/stat file, parse and calculate cumulative CPU
+   */
+  private void readProcStatFile() {
+    // Read "/proc/stat" file
+    BufferedReader in = null;
+    FileReader fReader = null;
+    try {
+      fReader = new FileReader(procfsStatFile);
+      in = new BufferedReader(fReader);
+    } catch (FileNotFoundException f) {
+      // shouldn't happen....
+      return;
+    }
+
+    Matcher mat = null;
+    try {
+      String str = in.readLine();
+      while (str != null) {
+        mat = CPU_TIME_FORMAT.matcher(str);
+        if (mat.find()) {
+          long uTime = Long.parseLong(mat.group(1));
+          long nTime = Long.parseLong(mat.group(2));
+          long sTime = Long.parseLong(mat.group(3));
+          cumulativeCpuTime = uTime + nTime + sTime; // milliseconds
+          break;
+        }
+        str = in.readLine();
+      }
+      cumulativeCpuTime *= jiffyLengthInMillis;
+    } catch (IOException io) {
+      LOG.warn("Error reading the stream " + io);
+    } finally {
+      // Close the streams
+      try {
+        fReader.close();
+        try {
+          in.close();
+        } catch (IOException i) {
+          LOG.warn("Error closing the stream " + in);
+        }
+      } catch (IOException i) {
+        LOG.warn("Error closing the stream " + fReader);
+      }
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public long getPhysicalMemorySize() {
+    readProcMemInfoFile();
+    return ramSize * 1024;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public long getVirtualMemorySize() {
+    readProcMemInfoFile();
+    return (ramSize + swapSize) * 1024;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public long getAvailablePhysicalMemorySize() {
+    readProcMemInfoFile(true);
+    return (ramSizeFree + inactiveSize) * 1024;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public long getAvailableVirtualMemorySize() {
+    readProcMemInfoFile(true);
+    return (ramSizeFree + swapSizeFree + inactiveSize) * 1024;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public int getNumProcessors() {
+    readProcCpuInfoFile();
+    return numProcessors;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public long getCpuFrequency() {
+    readProcCpuInfoFile();
+    return cpuFrequency;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public long getCumulativeCpuTime() {
+    readProcStatFile();
+    return cumulativeCpuTime;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public float getCpuUsage() {
+    readProcStatFile();
+    sampleTime = getCurrentTime();
+    if (lastSampleTime == UNAVAILABLE ||
+        lastSampleTime > sampleTime) {
+      // lastSampleTime > sampleTime may happen when the system time is changed
+      lastSampleTime = sampleTime;
+      lastCumulativeCpuTime = cumulativeCpuTime;
+      return cpuUsage;
+    }
+    // When lastSampleTime is sufficiently old, update cpuUsage.
+    // Also take a sample of the current time and cumulative CPU time for the
+    // use of the next calculation.
+    final long MINIMUM_UPDATE_INTERVAL = 10 * jiffyLengthInMillis;
+    if (sampleTime > lastSampleTime + MINIMUM_UPDATE_INTERVAL) {
+	    cpuUsage = (float)(cumulativeCpuTime - lastCumulativeCpuTime) * 100F /
+	               ((float)(sampleTime - lastSampleTime) * getNumProcessors());
+	    lastSampleTime = sampleTime;
+      lastCumulativeCpuTime = cumulativeCpuTime;
+    }
+    return cpuUsage;
+  }
+
+  /**
+   * Test the {@link LinuxResourceCalculatorPlugin}
+   *
+   * @param args
+   */
+  public static void main(String[] args) {
+    LinuxResourceCalculatorPlugin plugin = new LinuxResourceCalculatorPlugin();
+    System.out.println("Physical memory Size (bytes) : "
+        + plugin.getPhysicalMemorySize());
+    System.out.println("Total Virtual memory Size (bytes) : "
+        + plugin.getVirtualMemorySize());
+    System.out.println("Available Physical memory Size (bytes) : "
+        + plugin.getAvailablePhysicalMemorySize());
+    System.out.println("Total Available Virtual memory Size (bytes) : "
+        + plugin.getAvailableVirtualMemorySize());
+    System.out.println("Number of Processors : " + plugin.getNumProcessors());
+    System.out.println("CPU frequency (kHz) : " + plugin.getCpuFrequency());
+    System.out.println("Cumulative CPU time (ms) : " +
+            plugin.getCumulativeCpuTime());
+    try {
+      // Sleep so we can compute the CPU usage
+      Thread.sleep(500L);
+    } catch (InterruptedException e) {
+      // do nothing
+    }
+    System.out.println("CPU usage % : " + plugin.getCpuUsage());
+  }
+
+  @Override
+  public ProcResourceValues getProcResourceValues() {
+    pTree = pTree.getProcessTree();
+    long cpuTime = pTree.getCumulativeCpuTime();
+    long pMem = pTree.getCumulativeRssmem();
+    long vMem = pTree.getCumulativeVmem();
+    return new ProcResourceValues(cpuTime, pMem, vMem);
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/MemoryCalculatorPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/MemoryCalculatorPlugin.java?rev=1179465&r1=1179464&r2=1179465&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/MemoryCalculatorPlugin.java (original)
+++ hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/MemoryCalculatorPlugin.java Wed Oct  5 22:37:52 2011
@@ -24,7 +24,11 @@ import org.apache.hadoop.conf.Configured
 /**
  * Plugin to calculate virtual and physical memories on the system.
  * 
+ * @deprecated Use
+ *             {@link org.apache.hadoop.util.ResourceCalculatorPlugin}
+ *             instead
  */
+@Deprecated
 public abstract class MemoryCalculatorPlugin extends Configured {
 
   /**

Modified: hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java?rev=1179465&r1=1179464&r2=1179465&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java (original)
+++ hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java Wed Oct  5 22:37:52 2011
@@ -33,7 +33,7 @@ import java.util.LinkedList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 
 /**
  * A Proc file-system based ProcessTree. Works only on Linux.
@@ -44,17 +44,49 @@ public class ProcfsBasedProcessTree exte
       .getLog(ProcfsBasedProcessTree.class);
 
   private static final String PROCFS = "/proc/";
-  private static final Pattern PROCFS_STAT_FILE_FORMAT = Pattern
-      .compile("^([0-9-]+)\\s([^\\s]+)\\s[^\\s]\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+\\s){16}([0-9]+)(\\s[0-9-]+){16}");
+  private static final Pattern PROCFS_STAT_FILE_FORMAT = Pattern .compile(
+    "^([0-9-]+)\\s([^\\s]+)\\s[^\\s]\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+)\\s" +
+    "([0-9-]+\\s){7}([0-9]+)\\s([0-9]+)\\s([0-9-]+\\s){7}([0-9]+)\\s([0-9]+)" +
+    "(\\s[0-9-]+){15}");
 
   static final String PROCFS_STAT_FILE = "stat";
   static final String PROCFS_CMDLINE_FILE = "cmdline";
+  public static final long PAGE_SIZE;
+  static {
+    ShellCommandExecutor shellExecutor =
+            new ShellCommandExecutor(new String[]{"getconf",  "PAGESIZE"});
+    long pageSize = -1;
+    try {
+      shellExecutor.execute();
+      pageSize = Long.parseLong(shellExecutor.getOutput().replace("\n", ""));
+    } catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
+    } finally {
+      PAGE_SIZE = pageSize;
+    }
+  }
+  public static final long JIFFY_LENGTH_IN_MILLIS; // in millisecond
+  static {
+    ShellCommandExecutor shellExecutor =
+            new ShellCommandExecutor(new String[]{"getconf",  "CLK_TCK"});
+    long jiffiesPerSecond = -1;
+    try {
+      shellExecutor.execute();
+      jiffiesPerSecond = Long.parseLong(shellExecutor.getOutput().replace("\n", ""));
+    } catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
+    } finally {
+      JIFFY_LENGTH_IN_MILLIS = jiffiesPerSecond != -1 ?
+                     Math.round(1000D / jiffiesPerSecond) : -1;
+    }
+  }
 
   // to enable testing, using this variable which can be configured
   // to a test directory.
   private String procfsDir;
   
   private Integer pid = -1;
+  private Long cpuTime = 0L;
 
   private Map<Integer, ProcessInfo> processTree = new HashMap<Integer, ProcessInfo>();
 
@@ -149,11 +181,12 @@ public class ProcfsBasedProcessTree exte
         pInfoQueue.addAll(pInfo.getChildren());
       }
 
-      // update age values.
+      // update age values and compute the number of jiffies since last update
       for (Map.Entry<Integer, ProcessInfo> procs : processTree.entrySet()) {
         ProcessInfo oldInfo = oldProcs.get(procs.getKey());
-        if (oldInfo != null) {
-          if (procs.getValue() != null) {
+        if (procs.getValue() != null) {
+          procs.getValue().updateJiffy(oldInfo);
+          if (oldInfo != null) {
             procs.getValue().updateAge(oldInfo);  
           }
         }
@@ -196,7 +229,7 @@ public class ProcfsBasedProcessTree exte
   }
 
   private static final String PROCESSTREE_DUMP_FORMAT =
-      "\t|- %d %d %d %d %s %d %s\n";
+      "\t|- %d %d %d %d %s %d %d %d %d %s\n";
 
   /**
    * Get a dump of the process-tree.
@@ -208,12 +241,14 @@ public class ProcfsBasedProcessTree exte
     StringBuilder ret = new StringBuilder();
     // The header.
     ret.append(String.format("\t|- PID PPID PGRPID SESSID CMD_NAME "
-        + "VMEM_USAGE(BYTES) FULL_CMD_LINE\n"));
+        + "USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) "
+        + "RSSMEM_USAGE(PAGES) FULL_CMD_LINE\n"));
     for (ProcessInfo p : processTree.values()) {
       if (p != null) {
         ret.append(String.format(PROCESSTREE_DUMP_FORMAT, p.getPid(), p
             .getPpid(), p.getPgrpId(), p.getSessionId(), p.getName(), p
-            .getVmem(), p.getCmdLine(procfsDir)));
+            .getUtime(), p.getStime(), p.getVmem(), p.getRssmemPage(), p
+            .getCmdLine(procfsDir)));
       }
     }
     return ret.toString();
@@ -231,6 +266,18 @@ public class ProcfsBasedProcessTree exte
   }
 
   /**
+   * Get the cumulative resident set size (rss) memory used by all the processes
+   * in the process-tree.
+   *
+   * @return cumulative rss memory used by the process-tree in bytes. return 0
+   *         if it cannot be calculated
+   */
+  public long getCumulativeRssmem() {
+    // include all processes.. all processes will be older than 0.
+    return getCumulativeRssmem(0);
+  }
+
+  /**
    * Get the cumulative virtual memory used by all the processes in the
    * process-tree that are older than the passed in age.
    * 
@@ -249,6 +296,50 @@ public class ProcfsBasedProcessTree exte
     return total;
   }
 
+  /**
+   * Get the cumulative resident set size (rss) memory used by all the processes
+   * in the process-tree that are older than the passed in age.
+   *
+   * @param olderThanAge processes above this age are included in the
+   *                      memory addition
+   * @return cumulative rss memory used by the process-tree in bytes,
+   *          for processes older than this age. return 0 if it cannot be
+   *          calculated
+   */
+  public long getCumulativeRssmem(int olderThanAge) {
+    if (PAGE_SIZE < 0) {
+      return 0;
+    }
+    long totalPages = 0;
+    for (ProcessInfo p : processTree.values()) {
+      if ((p != null) && (p.getAge() > olderThanAge)) {
+        totalPages += p.getRssmemPage();
+      }
+    }
+    return totalPages * PAGE_SIZE; // convert # pages to byte
+  }
+
+  /**
+   * Get the CPU time in millisecond used by all the processes in the
+   * process-tree since the process-tree created
+   *
+   * @return cumulative CPU time in millisecond since the process-tree created
+   *         return 0 if it cannot be calculated
+   */
+  public long getCumulativeCpuTime() {
+    if (JIFFY_LENGTH_IN_MILLIS < 0) {
+      return 0;
+    }
+    long incJiffies = 0;
+    for (ProcessInfo p : processTree.values()) {
+      if (p != null) {
+        incJiffies += p.dtime;
+      }
+    }
+    cpuTime += incJiffies * JIFFY_LENGTH_IN_MILLIS;
+    return cpuTime;
+  }
+
   private static Integer getValidPID(String pid) {
     Integer retPid = -1;
     try {
@@ -318,10 +409,11 @@ public class ProcfsBasedProcessTree exte
       Matcher m = PROCFS_STAT_FILE_FORMAT.matcher(str);
       boolean mat = m.find();
       if (mat) {
-        // Set ( name ) ( ppid ) ( pgrpId ) (session ) (vsize )
-        pinfo.updateProcessInfo(m.group(2), Integer.parseInt(m.group(3)), Integer
-            .parseInt(m.group(4)), Integer.parseInt(m.group(5)), Long
-            .parseLong(m.group(7)));
+        // Set (name) (ppid) (pgrpId) (session) (utime) (stime) (vsize) (rss)
+         pinfo.updateProcessInfo(m.group(2), Integer.parseInt(m.group(3)),
+                 Integer.parseInt(m.group(4)), Integer.parseInt(m.group(5)),
+                 Long.parseLong(m.group(7)), Long.parseLong(m.group(8)),
+                 Long.parseLong(m.group(10)), Long.parseLong(m.group(11)));
       }
     } catch (IOException io) {
       LOG.warn("Error reading the stream " + io);
@@ -368,8 +460,18 @@ public class ProcfsBasedProcessTree exte
     private Integer ppid; // parent process-id
     private Integer sessionId; // session-id
     private Long vmem; // virtual memory usage
+    private Long rssmemPage; // rss memory usage in # of pages
+    private Long utime = 0L; // # of jiffies in user mode
+    private Long stime = 0L; // # of jiffies in kernel mode
     // how many times has this process been seen alive
     private int age; 
+
+    // # of jiffies used since last update:
+    private Long dtime = 0L;
+    // dtime = (utime + stime) - (utimeOld + stimeOld)
+    // We need this to compute the cumulative CPU time
+    // because the subprocess may finish earlier than root process
+
     private List<ProcessInfo> children = new ArrayList<ProcessInfo>(); // list of children
 
     public ProcessInfo(int pid) {
@@ -402,17 +504,41 @@ public class ProcfsBasedProcessTree exte
       return vmem;
     }
 
+    public Long getUtime() {
+      return utime;
+    }
+
+    public Long getStime() {
+      return stime;
+    }
+
+    public Long getDtime() {
+      return dtime;
+    }
+
+    public Long getRssmemPage() { // get rss # of pages
+      return rssmemPage;
+    }
+
     public int getAge() {
       return age;
     }
     
     public void updateProcessInfo(String name, Integer ppid, Integer pgrpId,
-        Integer sessionId, Long vmem) {
+        Integer sessionId, Long utime, Long stime, Long vmem, Long rssmem) {
       this.name = name;
       this.ppid = ppid;
       this.pgrpId = pgrpId;
       this.sessionId = sessionId;
+      this.utime = utime;
+      this.stime = stime;
       this.vmem = vmem;
+      this.rssmemPage = rssmem;
+    }
+
+    public void updateJiffy(ProcessInfo oldInfo) {
+      this.dtime = (oldInfo == null ? this.utime + this.stime
+              : (this.utime + this.stime) - (oldInfo.utime + oldInfo.stime));
     }
 
     public void updateAge(ProcessInfo oldInfo) {

Added: hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/ResourceCalculatorPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/ResourceCalculatorPlugin.java?rev=1179465&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/ResourceCalculatorPlugin.java (added)
+++ hadoop/common/branches/branch-0.20-security-205/src/core/org/apache/hadoop/util/ResourceCalculatorPlugin.java Wed Oct  5 22:37:52 2011
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Plugin to calculate resource information on the system.
+ * 
+ */
+public abstract class ResourceCalculatorPlugin extends Configured {
+
+  /**
+   * Obtain the total size of the virtual memory present in the system.
+   * 
+   * @return virtual memory size in bytes.
+   */
+  public abstract long getVirtualMemorySize();
+
+  /**
+   * Obtain the total size of the physical memory present in the system.
+   * 
+   * @return physical memory size bytes.
+   */
+  public abstract long getPhysicalMemorySize();
+
+  /**
+   * Obtain the total size of the available virtual memory present
+   * in the system.
+   *
+   * @return available virtual memory size in bytes.
+   */
+  public abstract long getAvailableVirtualMemorySize();
+
+  /**
+   * Obtain the total size of the available physical memory present
+   * in the system.
+   *
+   * @return available physical memory size bytes.
+   */
+  public abstract long getAvailablePhysicalMemorySize();
+
+  /**
+   * Obtain the total number of processors present on the system.
+   *
+   * @return number of processors
+   */
+  public abstract int getNumProcessors();
+
+  /**
+   * Obtain the CPU frequency of on the system.
+   *
+   * @return CPU frequency in kHz
+   */
+  public abstract long getCpuFrequency();
+
+  /**
+   * Obtain the cumulative CPU time since the system is on.
+   *
+   * @return cumulative CPU time in milliseconds
+   */
+  public abstract long getCumulativeCpuTime();
+
+  /**
+   * Obtain the CPU usage % of the machine. Return -1 if it is unavailable
+   *
+   * @return CPU usage in %
+   */
+  public abstract float getCpuUsage();
+
+  /**
+   * Obtain resource status used by current process tree.
+   */
+  public abstract ProcResourceValues getProcResourceValues();
+
+  public static class ProcResourceValues {
+    private final long cumulativeCpuTime;
+    private final long physicalMemorySize;
+    private final long virtualMemorySize;
+    public ProcResourceValues(long cumulativeCpuTime, long physicalMemorySize,
+                              long virtualMemorySize) {
+      this.cumulativeCpuTime = cumulativeCpuTime;
+      this.physicalMemorySize = physicalMemorySize;
+      this.virtualMemorySize = virtualMemorySize;
+    }
+    /**
+     * Obtain the physical memory size used by current process tree.
+     * @return physical memory size in bytes.
+     */
+    public long getPhysicalMemorySize() {
+      return physicalMemorySize;
+    }
+
+    /**
+     * Obtain the virtual memory size used by a current process tree.
+     * @return virtual memory size in bytes.
+     */
+    public long getVirtualMemorySize() {
+      return virtualMemorySize;
+    }
+
+    /**
+     * Obtain the cumulative CPU time used by a current process tree.
+     * @return cumulative CPU time in milliseconds
+     */
+    public long getCumulativeCpuTime() {
+      return cumulativeCpuTime;
+    }
+  }
+
+  /**
+   * Get the ResourceCalculatorPlugin from the class name and configure it. If
+   * class name is null, this method will try and return a memory calculator
+   * plugin available for this system.
+   * 
+   * @param clazz class-name
+   * @param conf configure the plugin with this.
+   * @return ResourceCalculatorPlugin
+   */
+  public static ResourceCalculatorPlugin getResourceCalculatorPlugin(
+      Class<? extends ResourceCalculatorPlugin> clazz, Configuration conf) {
+
+    if (clazz != null) {
+      return ReflectionUtils.newInstance(clazz, conf);
+    }
+
+    // No class given, try a os specific class
+    try {
+      String osName = System.getProperty("os.name");
+      if (osName.startsWith("Linux")) {
+        return new LinuxResourceCalculatorPlugin();
+      }
+    } catch (SecurityException se) {
+      // Failed to get Operating System name.
+      return null;
+    }
+
+    // Not supported on this system.
+    return null;
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-205/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/mapred/mapred-default.xml?rev=1179465&r1=1179464&r2=1179465&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-0.20-security-205/src/mapred/mapred-default.xml Wed Oct  5 22:37:52 2011
@@ -186,14 +186,14 @@
 -->
 
 <property>
-  <name>mapred.tasktracker.memory_calculator_plugin</name>
+  <name>mapred.tasktracker.resourcecalculatorplugin</name>
   <value></value>
   <description>
-   Name of the class whose instance will be used to query memory information
+   Name of the class whose instance will be used to query resource information
    on the tasktracker.
    
    The class must be an instance of 
-   org.apache.hadoop.util.MemoryCalculatorPlugin. If the value is null, the
+   org.apache.hadoop.util.ResourceCalculatorPlugin. If the value is null, the
    tasktracker attempts to use a class appropriate to the platform. 
    Currently, the only platform supported is Linux.
   </description>

Modified: hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=1179465&r1=1179464&r2=1179465&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java Wed Oct  5 22:37:52 2011
@@ -69,8 +69,10 @@ interface InterTrackerProtocol extends V
    * Version 26: Added numRequiredSlots to TaskStatus for MAPREDUCE-516
    * Version 27: Adding node health status to TaskStatus for MAPREDUCE-211
    * Version 28: Adding user name to the serialized Task for use by TT.
+   * Version 29: Adding available memory and CPU usage information on TT to
+   *             TaskTrackerStatus for MAPREDUCE-1218
    */
-  public static final long versionID = 28L;
+  public static final long versionID = 29L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;

Modified: hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1179465&r1=1179464&r2=1179465&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/JobTracker.java Wed Oct  5 22:37:52 2011
@@ -4468,9 +4468,9 @@ public class JobTracker implements MRCon
     }
   }
   
-  static final String MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY =
+  public static final String MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY =
       "mapred.cluster.map.memory.mb";
-  static final String MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY =
+  public static final String MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY =
       "mapred.cluster.reduce.memory.mb";
 
   static final String MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY =

Modified: hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1179465&r1=1179464&r2=1179465&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/Task.java Wed Oct  5 22:37:52 2011
@@ -50,10 +50,11 @@ import org.apache.hadoop.io.serializer.S
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.ResourceCalculatorPlugin;
+import org.apache.hadoop.util.ResourceCalculatorPlugin.ProcResourceValues;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.fs.FSDataInputStream;
 
@@ -85,7 +86,11 @@ abstract public class Task implements Wr
     REDUCE_SKIPPED_GROUPS,
     REDUCE_SKIPPED_RECORDS,
     SPILLED_RECORDS,
-    SPLIT_RAW_BYTES
+    SPLIT_RAW_BYTES,
+    CPU_MILLISECONDS,
+    PHYSICAL_MEMORY_BYTES,
+    VIRTUAL_MEMORY_BYTES,
+    COMMITTED_HEAP_BYTES
   }
   
   /**
@@ -143,6 +148,9 @@ abstract public class Task implements Wr
   private Iterator<Long> currentRecIndexIterator = 
     skipRanges.skipRangeIterator();
   
+  private ResourceCalculatorPlugin resourceCalculator = null;
+  private long initCpuCumulativeTime = 0;
+
   protected JobConf conf;
   protected MapOutputFile mapOutputFile = new MapOutputFile();
   protected LocalDirAllocator lDirAlloc;
@@ -518,6 +526,16 @@ abstract public class Task implements Wr
       }
     }
     committer.setupTask(taskContext);
+    Class<? extends ResourceCalculatorPlugin> clazz =
+        conf.getClass(TaskTracker.TT_RESOURCE_CALCULATOR_PLUGIN,
+            null, ResourceCalculatorPlugin.class);
+    resourceCalculator = ResourceCalculatorPlugin
+            .getResourceCalculatorPlugin(clazz, conf);
+    LOG.info(" Using ResourceCalculatorPlugin : " + resourceCalculator);
+    if (resourceCalculator != null) {
+      initCpuCumulativeTime =
+        resourceCalculator.getProcResourceValues().getCumulativeCpuTime();
+    }
   }
   
   protected class TaskReporter 
@@ -698,6 +716,7 @@ abstract public class Task implements Wr
       }
     }
     public void stopCommunicationThread() throws InterruptedException {
+      // Updating resources specified in ResourceCalculatorPlugin
       if (pingThread != null) {
         synchronized(lock) {
           while(!done) {
@@ -776,6 +795,27 @@ abstract public class Task implements Wr
   private Map<String, FileSystemStatisticUpdater> statisticUpdaters =
      new HashMap<String, FileSystemStatisticUpdater>();
   
+  /**
+   * Update resource information counters
+   */
+   void updateResourceCounters() {
+     // Update generic resource counters
+     updateHeapUsageCounter();
+     
+     if (resourceCalculator == null) {
+       return;
+     }
+     ProcResourceValues res = resourceCalculator.getProcResourceValues();
+     long cpuTime = res.getCumulativeCpuTime();
+     long pMem = res.getPhysicalMemorySize();
+     long vMem = res.getVirtualMemorySize();
+     // Remove the CPU time consumed previously by JVM reuse
+     cpuTime -= initCpuCumulativeTime;
+     counters.findCounter(Counter.CPU_MILLISECONDS).setValue(cpuTime);
+     counters.findCounter(Counter.PHYSICAL_MEMORY_BYTES).setValue(pMem);
+     counters.findCounter(Counter.VIRTUAL_MEMORY_BYTES).setValue(vMem);
+   }
+  
   private synchronized void updateCounters() {
     for(Statistics stat: FileSystem.getAllStatistics()) {
       String uriScheme = stat.getScheme();
@@ -786,6 +826,19 @@ abstract public class Task implements Wr
       }
       updater.updateCounters();      
     }
+    // TODO Should CPU related counters be update only once i.e in the end
+    updateResourceCounters();
+  }
+
+  /**
+   * Updates the {@link TaskCounter#COMMITTED_HEAP_BYTES} counter to reflect the
+   * current total committed heap space usage of this JVM.
+   */
+  @SuppressWarnings("deprecation")
+  private void updateHeapUsageCounter() {
+    long currentHeapUsage = Runtime.getRuntime().totalMemory();
+    counters.findCounter(Counter.COMMITTED_HEAP_BYTES)
+            .setValue(currentHeapUsage);
   }
 
   public void done(TaskUmbilicalProtocol umbilical,

Modified: hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1179465&r1=1179464&r2=1179465&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Wed Oct  5 22:37:52 2011
@@ -92,6 +92,7 @@ import org.apache.hadoop.security.author
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.MemoryCalculatorPlugin;
+import org.apache.hadoop.util.ResourceCalculatorPlugin;
 import org.apache.hadoop.util.ProcfsBasedProcessTree;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -347,11 +348,14 @@ public class TaskTracker implements MRCo
   private long mapSlotMemorySizeOnTT = JobConf.DISABLED_MEMORY_LIMIT;
   private long reduceSlotSizeMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
   private long totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT;
+  private ResourceCalculatorPlugin resourceCalculatorPlugin = null;
 
   private UserLogManager userLogManager;
 
   static final String MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY =
       "mapred.tasktracker.memory_calculator_plugin";
+  public static final String TT_RESOURCE_CALCULATOR_PLUGIN = 
+      "mapreduce.tasktracker.resourcecalculatorplugin";
 
   /**
    * the minimum interval between jobtracker polls
@@ -802,6 +806,12 @@ public class TaskTracker implements MRCo
                              taskTrackerName);
     mapEventsFetcher.start();
 
+    Class<? extends ResourceCalculatorPlugin> clazz =
+        fConf.getClass(TT_RESOURCE_CALCULATOR_PLUGIN,
+                       null, ResourceCalculatorPlugin.class);
+    resourceCalculatorPlugin = 
+      ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, fConf);
+    LOG.info(" Using ResourceCalculatorPlugin : " + resourceCalculatorPlugin);
     initializeMemoryManagement();
 
     getUserLogManager().clearOldUserLogs(fConf);
@@ -1739,6 +1749,12 @@ public class TaskTracker implements MRCo
       long freeDiskSpace = getFreeSpace();
       long totVmem = getTotalVirtualMemoryOnTT();
       long totPmem = getTotalPhysicalMemoryOnTT();
+      long availableVmem = getAvailableVirtualMemoryOnTT();
+      long availablePmem = getAvailablePhysicalMemoryOnTT();
+      long cumuCpuTime = getCumulativeCpuTimeOnTT();
+      long cpuFreq = getCpuFrequencyOnTT();
+      int numCpu = getNumProcessorsOnTT();
+      float cpuUsage = getCpuUsageOnTT();
 
       status.getResourceStatus().setAvailableSpace(freeDiskSpace);
       status.getResourceStatus().setTotalVirtualMemory(totVmem);
@@ -1747,6 +1763,12 @@ public class TaskTracker implements MRCo
           mapSlotMemorySizeOnTT);
       status.getResourceStatus().setReduceSlotMemorySizeOnTT(
           reduceSlotSizeMemoryOnTT);
+      status.getResourceStatus().setAvailableVirtualMemory(availableVmem); 
+      status.getResourceStatus().setAvailablePhysicalMemory(availablePmem);
+      status.getResourceStatus().setCumulativeCpuTime(cumuCpuTime);
+      status.getResourceStatus().setCpuFrequency(cpuFreq);
+      status.getResourceStatus().setNumProcessors(numCpu);
+      status.getResourceStatus().setCpuUsage(cpuUsage);
     }
     //add node health information
     
@@ -1819,6 +1841,80 @@ public class TaskTracker implements MRCo
     return totalPhysicalMemoryOnTT;
   }
 
+  /**
+   * Return the free virtual memory available on this TaskTracker.
+   * @return total size of free virtual memory.
+   */
+  long getAvailableVirtualMemoryOnTT() {
+    long availableVirtualMemoryOnTT = TaskTrackerStatus.UNAVAILABLE;
+    if (resourceCalculatorPlugin != null) {
+      availableVirtualMemoryOnTT =
+              resourceCalculatorPlugin.getAvailableVirtualMemorySize();
+    }
+    return availableVirtualMemoryOnTT;
+  }
+
+  /**
+   * Return the free physical memory available on this TaskTracker.
+   * @return total size of free physical memory in bytes
+   */
+  long getAvailablePhysicalMemoryOnTT() {
+    long availablePhysicalMemoryOnTT = TaskTrackerStatus.UNAVAILABLE;
+    if (resourceCalculatorPlugin != null) {
+      availablePhysicalMemoryOnTT =
+              resourceCalculatorPlugin.getAvailablePhysicalMemorySize();
+    }
+    return availablePhysicalMemoryOnTT;
+  }
+
+  /**
+   * Return the cumulative CPU used time on this TaskTracker since system is on
+   * @return cumulative CPU used time in millisecond
+   */
+  long getCumulativeCpuTimeOnTT() {
+    long cumulativeCpuTime = TaskTrackerStatus.UNAVAILABLE;
+    if (resourceCalculatorPlugin != null) {
+      cumulativeCpuTime = resourceCalculatorPlugin.getCumulativeCpuTime();
+    }
+    return cumulativeCpuTime;
+  }
+
+  /**
+   * Return the number of Processors on this TaskTracker
+   * @return number of processors
+   */
+  int getNumProcessorsOnTT() {
+    int numProcessors = TaskTrackerStatus.UNAVAILABLE;
+    if (resourceCalculatorPlugin != null) {
+      numProcessors = resourceCalculatorPlugin.getNumProcessors();
+    }
+    return numProcessors;
+  }
+
+  /**
+   * Return the CPU frequency of this TaskTracker
+   * @return CPU frequency in kHz
+   */
+  long getCpuFrequencyOnTT() {
+    long cpuFrequency = TaskTrackerStatus.UNAVAILABLE;
+    if (resourceCalculatorPlugin != null) {
+      cpuFrequency = resourceCalculatorPlugin.getCpuFrequency();
+    }
+    return cpuFrequency;
+  }
+
+  /**
+   * Return the CPU usage in % of this TaskTracker
+   * @return CPU usage in %
+   */
+  float getCpuUsageOnTT() {
+    float cpuUsage = TaskTrackerStatus.UNAVAILABLE;
+    if (resourceCalculatorPlugin != null) {
+      cpuUsage = resourceCalculatorPlugin.getCpuUsage();
+    }
+    return cpuUsage;
+  }
+  
   long getTotalMemoryAllottedForTasksOnTT() {
     return totalMemoryAllottedForTasks;
   }
@@ -3974,25 +4070,31 @@ public class TaskTracker implements MRCo
           JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY));
     }
 
-    Class<? extends MemoryCalculatorPlugin> clazz =
-        fConf.getClass(MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
-            null, MemoryCalculatorPlugin.class);
-    MemoryCalculatorPlugin memoryCalculatorPlugin =
-        MemoryCalculatorPlugin
-            .getMemoryCalculatorPlugin(clazz, fConf);
-    LOG.info(" Using MemoryCalculatorPlugin : " + memoryCalculatorPlugin);
-
-    if (memoryCalculatorPlugin != null) {
-      totalVirtualMemoryOnTT = memoryCalculatorPlugin.getVirtualMemorySize();
+    // Use TT_RESOURCE_CALCULATOR_PLUGIN if it is configured.
+    Class<? extends MemoryCalculatorPlugin> clazz = 
+      fConf.getClass(MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY, 
+                     null, MemoryCalculatorPlugin.class); 
+    MemoryCalculatorPlugin memoryCalculatorPlugin = 
+      (clazz == null 
+       ? null 
+       : MemoryCalculatorPlugin.getMemoryCalculatorPlugin(clazz, fConf)); 
+    if (memoryCalculatorPlugin != null || resourceCalculatorPlugin != null) {
+      totalVirtualMemoryOnTT = 
+        (memoryCalculatorPlugin == null 
+         ? resourceCalculatorPlugin.getVirtualMemorySize() 
+         : memoryCalculatorPlugin.getVirtualMemorySize());
       if (totalVirtualMemoryOnTT <= 0) {
         LOG.warn("TaskTracker's totalVmem could not be calculated. "
-            + "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT);
+                 + "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT);
         totalVirtualMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
       }
-      totalPhysicalMemoryOnTT = memoryCalculatorPlugin.getPhysicalMemorySize();
+      totalPhysicalMemoryOnTT = 
+        (memoryCalculatorPlugin == null 
+         ? resourceCalculatorPlugin.getPhysicalMemorySize() 
+         : memoryCalculatorPlugin.getPhysicalMemorySize());
       if (totalPhysicalMemoryOnTT <= 0) {
         LOG.warn("TaskTracker's totalPmem could not be calculated. "
-            + "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT);
+                 + "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT);
         totalPhysicalMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
       }
     }



Mime
View raw message