Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java Tue Jan 26 14:02:53 2010
@@ -193,7 +193,7 @@
synchronized (localizedUser) {
if (localizedUser.get()) {
- // User-directories are already localized for his user.
+ // User-directories are already localized for this user.
LOG.info("User-directories for the user " + user
+ " are already initialized on this TT. Not doing anything.");
return;
@@ -233,7 +233,7 @@
// Set up the cache directory used for distributed cache files
File distributedCacheDir =
- new File(localDir, TaskTracker.getDistributedCacheDir(user));
+ new File(localDir, TaskTracker.getPrivateDistributedCacheDir(user));
if (distributedCacheDir.exists() || distributedCacheDir.mkdirs()) {
// Set permissions on the distcache-directory
PermissionsHandler.setPermissions(distributedCacheDir,
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java Tue Jan 26 14:02:53 2010
@@ -67,8 +67,14 @@
"mapreduce.tasktracker.instrumentation";
public static final String TT_MAP_SLOTS =
"mapreduce.tasktracker.map.tasks.maximum";
+ /**
+ * @deprecated Use {@link #TT_RESOURCE_CALCULATOR_PLUGIN} instead
+ */
+ @Deprecated
public static final String TT_MEMORY_CALCULATOR_PLUGIN =
"mapreduce.tasktracker.memorycalculatorplugin";
+ public static final String TT_RESOURCE_CALCULATOR_PLUGIN =
+ "mapreduce.tasktracker.resourcecalculatorplugin";
public static final String TT_REDUCE_SLOTS =
"mapreduce.tasktracker.reduce.tasks.maximum";
public static final String TT_MEMORY_MANAGER_MONITORING_INTERVAL =
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Tue Jan 26 14:02:53 2010
@@ -28,6 +28,8 @@
import java.util.List;
import java.util.Set;
+import javax.crypto.SecretKey;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.IOUtils;
@@ -41,15 +43,11 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.security.JobTokens;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.commons.codec.binary.Base64;
-import java.security.GeneralSecurityException;
-
class Fetcher<K,V> extends Thread {
private static final Log LOG = LogFactory.getLog(Fetcher.class);
@@ -88,12 +86,12 @@
// Decompression of map-outputs
private final CompressionCodec codec;
private final Decompressor decompressor;
- private final byte[] shuffleJobToken;
+ private final SecretKey jobTokenSecret;
public Fetcher(JobConf job, TaskAttemptID reduceId,
ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger,
Reporter reporter, ShuffleClientMetrics metrics,
- ExceptionReporter exceptionReporter, byte [] shuffleJobToken) {
+ ExceptionReporter exceptionReporter, SecretKey jobTokenSecret) {
this.reporter = reporter;
this.scheduler = scheduler;
this.merger = merger;
@@ -101,7 +99,7 @@
this.exceptionReporter = exceptionReporter;
this.id = ++nextId;
this.reduce = reduceId.getTaskID().getId();
- this.shuffleJobToken = shuffleJobToken;
+ this.jobTokenSecret = jobTokenSecret;
ioErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.IO_ERROR.toString());
wrongLengthErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
@@ -196,9 +194,8 @@
URLConnection connection = url.openConnection();
// generate hash of the url
- SecureShuffleUtils ssutil = new SecureShuffleUtils(shuffleJobToken);
String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
- String encHash = ssutil.hashFromString(msgToEncode);
+ String encHash = SecureShuffleUtils.hashFromString(msgToEncode, jobTokenSecret);
// put url hash into http header
connection.addRequestProperty(
@@ -215,7 +212,7 @@
}
LOG.debug("url="+msgToEncode+";encHash="+encHash+";replyHash="+replyHash);
// verify that replyHash is HMac of encHash
- ssutil.verifyReply(replyHash, encHash);
+ SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret);
LOG.info("for url="+msgToEncode+" sent hash and receievd reply");
} catch (IOException ie) {
ioErrs.increment(1);
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java Tue Jan 26 14:02:53 2010
@@ -107,7 +107,7 @@
for (int i=0; i < numFetchers; ++i) {
fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
reporter, metrics, this,
- reduceTask.getJobTokens().getShuffleJobToken());
+ reduceTask.getJobTokenSecret());
fetchers[i].start();
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java Tue Jan 26 14:02:53 2010
@@ -37,6 +37,7 @@
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.task.reduce.MapHost.State;
@@ -82,10 +83,12 @@
private int maxMapRuntime = 0;
private int maxFailedUniqueFetches = 5;
+ private int maxFetchFailuresBeforeReporting;
private long totalBytesShuffledTillNow = 0;
private DecimalFormat mbpsFormat = new DecimalFormat("0.00");
+ private boolean reportReadErrorImmediately = true;
public ShuffleScheduler(JobConf job, TaskStatus status,
ExceptionReporter reporter,
@@ -108,6 +111,10 @@
referee.start();
this.maxFailedUniqueFetches = Math.min(totalMaps,
this.maxFailedUniqueFetches);
+ this.maxFetchFailuresBeforeReporting = job.getInt(
+ JobContext.SHUFFLE_FETCH_FAILURES, REPORT_FAILURE_LIMIT);
+ this.reportReadErrorImmediately = job.getBoolean(
+ JobContext.SHUFFLE_NOTIFY_READERROR, true);
}
public synchronized void copySucceeded(TaskAttemptID mapId,
@@ -175,7 +182,6 @@
}
}
- // Notify the JobTracker after every 'reportFailureLimit' failures
checkAndInformJobTracker(failures, mapId, readError);
checkReducerHealth();
@@ -188,9 +194,14 @@
failedShuffleCounter.increment(1);
}
+ // Notify the JobTracker
+ // after every read error, if 'reportReadErrorImmediately' is true or
+ // after every 'maxFetchFailuresBeforeReporting' failures
private void checkAndInformJobTracker(
int failures, TaskAttemptID mapId, boolean readError) {
- if (readError || ((failures % REPORT_FAILURE_LIMIT) == 0)) {
+ if ((reportReadErrorImmediately && readError)
+ || ((failures % maxFetchFailuresBeforeReporting) == 0)) {
+ LOG.info("Reporting fetch failure for " + mapId + " to jobtracker.");
status.addFetchFailedMap((org.apache.hadoop.mapred.TaskAttemptID) mapId);
}
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java Tue Jan 26 14:02:53 2010
@@ -144,7 +144,9 @@
Configuration.addDeprecation("mapred.tasktracker.map.tasks.maximum",
new String[] {TTConfig.TT_MAP_SLOTS});
Configuration.addDeprecation("mapred.tasktracker.memory_calculator_plugin",
- new String[] {TTConfig.TT_MEMORY_CALCULATOR_PLUGIN});
+ new String[] {TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN});
+ Configuration.addDeprecation("mapred.tasktracker.memorycalculatorplugin",
+ new String[] {TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN});
Configuration.addDeprecation("mapred.tasktracker.reduce.tasks.maximum",
new String[] {TTConfig.TT_REDUCE_SLOTS});
Configuration.addDeprecation(
@@ -185,8 +187,6 @@
new String[] {JobContext.QUEUE_NAME});
Configuration.addDeprecation("mapred.job.reuse.jvm.num.tasks",
new String[] {JobContext.JVM_NUMTASKS_TORUN});
- Configuration.addDeprecation("mapred.job.split.file",
- new String[] {JobContext.SPLIT_FILE});
Configuration.addDeprecation("mapred.map.tasks",
new String[] {JobContext.NUM_MAPS});
Configuration.addDeprecation("mapred.max.tracker.failures",
@@ -324,7 +324,7 @@
Configuration.addDeprecation("mapred.skip.map.max.skip.records",
new String[] {JobContext.MAP_SKIP_MAX_RECORDS});
Configuration.addDeprecation("min.num.spills.for.combine",
- new String[] {JobContext.MAP_COMBINE_MIN_SPISS});
+ new String[] {JobContext.MAP_COMBINE_MIN_SPILLS});
Configuration.addDeprecation("mapred.compress.map.output",
new String[] {JobContext.MAP_OUTPUT_COMPRESS});
Configuration.addDeprecation("mapred.map.output.compression.codec",
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/util/LinuxMemoryCalculatorPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/util/LinuxMemoryCalculatorPlugin.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/util/LinuxMemoryCalculatorPlugin.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/util/LinuxMemoryCalculatorPlugin.java Tue Jan 26 14:02:53 2010
@@ -18,115 +18,29 @@
package org.apache.hadoop.mapreduce.util;
-import java.io.BufferedReader;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
/**
* Plugin to calculate virtual and physical memories on Linux systems.
+ * @deprecated
+ * Use {@link org.apache.hadoop.mapreduce.util.LinuxResourceCalculatorPlugin}
+ * instead
*/
+@Deprecated
public class LinuxMemoryCalculatorPlugin extends MemoryCalculatorPlugin {
- private static final Log LOG =
- LogFactory.getLog(LinuxMemoryCalculatorPlugin.class);
-
- /**
- * proc's meminfo virtual file has keys-values in the format
- * "key:[ \t]*value[ \t]kB".
- */
- private static final String PROCFS_MEMFILE = "/proc/meminfo";
- private static final Pattern PROCFS_MEMFILE_FORMAT =
- Pattern.compile("^([a-zA-Z]*):[ \t]*([0-9]*)[ \t]kB");
-
- // We just need the values for the keys MemTotal and SwapTotal
- private static final String MEMTOTAL_STRING = "MemTotal";
- private static final String SWAPTOTAL_STRING = "SwapTotal";
-
- private long ramSize = 0;
- private long swapSize = 0;
-
- boolean readMemInfoFile = false;
-
- private void readProcMemInfoFile() {
-
- if (readMemInfoFile) {
- return;
- }
-
- // Read "/proc/memInfo" file
- BufferedReader in = null;
- FileReader fReader = null;
- try {
- fReader = new FileReader(PROCFS_MEMFILE);
- in = new BufferedReader(fReader);
- } catch (FileNotFoundException f) {
- // shouldn't happen....
- return;
- }
-
- Matcher mat = null;
-
- try {
- String str = in.readLine();
- while (str != null) {
- mat = PROCFS_MEMFILE_FORMAT.matcher(str);
- if (mat.find()) {
- if (mat.group(1).equals(MEMTOTAL_STRING)) {
- ramSize = Long.parseLong(mat.group(2));
- } else if (mat.group(1).equals(SWAPTOTAL_STRING)) {
- swapSize = Long.parseLong(mat.group(2));
- }
- }
- str = in.readLine();
- }
- } catch (IOException io) {
- LOG.warn("Error reading the stream " + io);
- } finally {
- // Close the streams
- try {
- fReader.close();
- try {
- in.close();
- } catch (IOException i) {
- LOG.warn("Error closing the stream " + in);
- }
- } catch (IOException i) {
- LOG.warn("Error closing the stream " + fReader);
- }
- }
-
- readMemInfoFile = true;
+ private LinuxResourceCalculatorPlugin resourceCalculatorPlugin;
+ // Use everything from LinuxResourceCalculatorPlugin
+ public LinuxMemoryCalculatorPlugin() {
+ resourceCalculatorPlugin = new LinuxResourceCalculatorPlugin();
}
-
+
/** {@inheritDoc} */
@Override
public long getPhysicalMemorySize() {
- readProcMemInfoFile();
- return ramSize * 1024;
+ return resourceCalculatorPlugin.getPhysicalMemorySize();
}
-
+
/** {@inheritDoc} */
@Override
public long getVirtualMemorySize() {
- readProcMemInfoFile();
- return (ramSize + swapSize) * 1024;
- }
-
- /**
- * Test the {@link LinuxMemoryCalculatorPlugin}
- *
- * @param args
- */
- public static void main(String[] args) {
- LinuxMemoryCalculatorPlugin plugin = new LinuxMemoryCalculatorPlugin();
- System.out.println("Physical memory Size(bytes) : "
- + plugin.getPhysicalMemorySize());
- System.out.println("Total Virtual memory Size(bytes) : "
- + plugin.getVirtualMemorySize());
+ return resourceCalculatorPlugin.getVirtualMemorySize();
}
}
\ No newline at end of file
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/util/MemoryCalculatorPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/util/MemoryCalculatorPlugin.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/util/MemoryCalculatorPlugin.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/util/MemoryCalculatorPlugin.java Tue Jan 26 14:02:53 2010
@@ -24,8 +24,11 @@
/**
* Plugin to calculate virtual and physical memories on the system.
- *
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin}
+ * instead
*/
+@Deprecated
public abstract class MemoryCalculatorPlugin extends Configured {
/**
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/util/ProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/util/ProcessTree.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/util/ProcessTree.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/util/ProcessTree.java Tue Jan 26 14:02:53 2010
@@ -37,6 +37,15 @@
public static final long DEFAULT_SLEEPTIME_BEFORE_SIGKILL = 5000L;
+ private static final int SIGQUIT = 3;
+ private static final int SIGTERM = 15;
+ private static final int SIGKILL = 9;
+
+ private static final String SIGQUIT_STR = "SIGQUIT";
+ private static final String SIGTERM_STR = "SIGTERM";
+ private static final String SIGKILL_STR = "SIGKILL";
+
+
public static final boolean isSetsidAvailable = isSetsidSupported();
private static boolean isSetsidSupported() {
ShellCommandExecutor shexec = null;
@@ -102,43 +111,78 @@
sigKill(pgrpId, true, sleeptimeBeforeSigkill, inBackground);
}
+
/**
- * Sends terminate signal to the process, allowing it to gracefully exit.
- *
- * @param pid pid of the process to be sent SIGTERM
+ * Send a specified signal to the specified pid
+ *
+ * @param pid the pid of the process [group] to signal.
+ * @param signalNum the signal to send.
+ * @param signalName the human-readable description of the signal
+ * (for logging).
*/
- public static void terminateProcess(String pid) {
+ private static void sendSignal(String pid, int signalNum, String signalName) {
ShellCommandExecutor shexec = null;
try {
- String[] args = { "kill", pid };
+ String[] args = { "kill", "-" + signalNum, pid };
shexec = new ShellCommandExecutor(args);
shexec.execute();
} catch (IOException ioe) {
LOG.warn("Error executing shell command " + ioe);
} finally {
- LOG.info("Killing process " + pid +
- " with SIGTERM. Exit code " + shexec.getExitCode());
+ if (pid.startsWith("-")) {
+ LOG.info("Sending signal to all members of process group " + pid
+ + ": " + signalName + ". Exit code " + shexec.getExitCode());
+ } else {
+ LOG.info("Signaling process " + pid
+ + " with " + signalName + ". Exit code " + shexec.getExitCode());
+ }
}
}
/**
+ * Send a specified signal to the process, if it is alive.
+ *
+ * @param pid the pid of the process to signal.
+ * @param signalNum the signal to send.
+ * @param signalName the human-readable description of the signal
+ * (for logging).
+ * @param alwaysSignal if true then send signal even if isAlive(pid) is false
+ */
+ private static void maybeSignalProcess(String pid, int signalNum,
+ String signalName, boolean alwaysSignal) {
+ // If process tree is not alive then don't signal, unless alwaysSignal
+ // forces it so.
+ if (alwaysSignal || ProcessTree.isAlive(pid)) {
+ sendSignal(pid, signalNum, signalName);
+ }
+ }
+
+ private static void maybeSignalProcessGroup(String pgrpId, int signalNum,
+ String signalName, boolean alwaysSignal) {
+
+ if (alwaysSignal || ProcessTree.isProcessGroupAlive(pgrpId)) {
+ // signaling a process group means using a negative pid.
+ sendSignal("-" + pgrpId, signalNum, signalName);
+ }
+ }
+
+ /**
+ * Sends terminate signal to the process, allowing it to gracefully exit.
+ *
+ * @param pid pid of the process to be sent SIGTERM
+ */
+ public static void terminateProcess(String pid) {
+ maybeSignalProcess(pid, SIGTERM, SIGTERM_STR, true);
+ }
+
+ /**
* Sends terminate signal to all the process belonging to the passed process
* group, allowing the group to gracefully exit.
*
* @param pgrpId process group id
*/
public static void terminateProcessGroup(String pgrpId) {
- ShellCommandExecutor shexec = null;
- try {
- String[] args = { "kill", "--", "-" + pgrpId };
- shexec = new ShellCommandExecutor(args);
- shexec.execute();
- } catch (IOException ioe) {
- LOG.warn("Error executing shell command " + ioe);
- } finally {
- LOG.info("Killing all processes in the process group " + pgrpId +
- " with SIGTERM. Exit code " + shexec.getExitCode());
- }
+ maybeSignalProcessGroup(pgrpId, SIGTERM, SIGTERM_STR, true);
}
/**
@@ -197,22 +241,17 @@
* @param pid process id
*/
public static void killProcess(String pid) {
+ maybeSignalProcess(pid, SIGKILL, SIGKILL_STR, false);
+ }
- //If process tree is not alive then return immediately.
- if(!ProcessTree.isAlive(pid)) {
- return;
- }
- String[] args = { "kill", "-9", pid };
- ShellCommandExecutor shexec = new ShellCommandExecutor(args);
- try {
- shexec.execute();
- } catch (IOException e) {
- LOG.warn("Error sending SIGKILL to process "+ pid + " ."+
- StringUtils.stringifyException(e));
- } finally {
- LOG.info("Killing process " + pid + " with SIGKILL. Exit code "
- + shexec.getExitCode());
- }
+ /**
+ * Sends SIGQUIT to process; Java programs will dump their stack to
+ * stdout.
+ *
+ * @param pid process id
+ */
+ public static void sigQuitProcess(String pid) {
+ maybeSignalProcess(pid, SIGQUIT, SIGQUIT_STR, false);
}
/**
@@ -222,25 +261,20 @@
* @param pgrpId process group id
*/
public static void killProcessGroup(String pgrpId) {
+ maybeSignalProcessGroup(pgrpId, SIGKILL, SIGKILL_STR, false);
+ }
- //If process tree is not alive then return immediately.
- if(!ProcessTree.isProcessGroupAlive(pgrpId)) {
- return;
- }
-
- String[] args = { "kill", "-9", "-"+pgrpId };
- ShellCommandExecutor shexec = new ShellCommandExecutor(args);
- try {
- shexec.execute();
- } catch (IOException e) {
- LOG.warn("Error sending SIGKILL to process group "+ pgrpId + " ."+
- StringUtils.stringifyException(e));
- } finally {
- LOG.info("Killing process group" + pgrpId + " with SIGKILL. Exit code "
- + shexec.getExitCode());
- }
+ /**
+ * Sends SIGQUIT to all processes belonging to the same process group,
+ * ordering all processes in the group to send their stack dump to
+ * stdout.
+ *
+ * @param pgrpId process group id
+ */
+ public static void sigQuitProcessGroup(String pgrpId) {
+ maybeSignalProcessGroup(pgrpId, SIGQUIT, SIGQUIT_STR, false);
}
-
+
/**
* Is the process with PID pid still alive?
* This method assumes that isAlive is called on a pid that was alive not
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/util/ProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/util/ProcfsBasedProcessTree.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/util/ProcfsBasedProcessTree.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/util/ProcfsBasedProcessTree.java Tue Jan 26 14:02:53 2010
@@ -46,8 +46,10 @@
private static final String PROCFS = "/proc/";
- private static final Pattern PROCFS_STAT_FILE_FORMAT = Pattern
- .compile("^([0-9-]+)\\s([^\\s]+)\\s[^\\s]\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+\\s){16}([0-9]+)\\s([0-9]+)(\\s[0-9-]+){15}");
+ private static final Pattern PROCFS_STAT_FILE_FORMAT = Pattern .compile(
+ "^([0-9-]+)\\s([^\\s]+)\\s[^\\s]\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+)\\s" +
+ "([0-9-]+\\s){7}([0-9]+)\\s([0-9]+)\\s([0-9-]+\\s){7}([0-9]+)\\s([0-9]+)" +
+ "(\\s[0-9-]+){15}");
static final String PROCFS_STAT_FILE = "stat";
static final String PROCFS_CMDLINE_FILE = "cmdline";
@@ -65,12 +67,28 @@
PAGE_SIZE = pageSize;
}
}
+ public static final long JIFFY_LENGTH_IN_MILLIS; // in millisecond
+ static {
+ ShellCommandExecutor shellExecutor =
+ new ShellCommandExecutor(new String[]{"getconf", "CLK_TCK"});
+ long jiffiesPerSecond = -1;
+ try {
+ shellExecutor.execute();
+ jiffiesPerSecond = Long.parseLong(shellExecutor.getOutput().replace("\n", ""));
+ } catch (IOException e) {
+ LOG.error(StringUtils.stringifyException(e));
+ } finally {
+ JIFFY_LENGTH_IN_MILLIS = jiffiesPerSecond != -1 ?
+ Math.round(1000D / jiffiesPerSecond) : -1;
+ }
+ }
// to enable testing, using this variable which can be configured
// to a test directory.
private String procfsDir;
private Integer pid = -1;
+ private Long cpuTime = 0L;
private boolean setsidUsed = false;
private long sleeptimeBeforeSigkill = DEFAULT_SLEEPTIME_BEFORE_SIGKILL;
@@ -195,11 +213,12 @@
pInfoQueue.addAll(pInfo.getChildren());
}
- // update age values.
+ // update age values and compute the number of jiffies since last update
for (Map.Entry<Integer, ProcessInfo> procs : processTree.entrySet()) {
ProcessInfo oldInfo = oldProcs.get(procs.getKey());
- if (oldInfo != null) {
- if (procs.getValue() != null) {
+ if (procs.getValue() != null) {
+ procs.getValue().updateJiffy(oldInfo);
+ if (oldInfo != null) {
procs.getValue().updateAge(oldInfo);
}
}
@@ -324,7 +343,7 @@
}
private static final String PROCESSTREE_DUMP_FORMAT =
- "\t|- %d %d %d %d %s %d %d %s\n";
+ "\t|- %d %d %d %d %s %d %d %d %d %s\n";
/**
* Get a dump of the process-tree.
@@ -336,12 +355,14 @@
StringBuilder ret = new StringBuilder();
// The header.
ret.append(String.format("\t|- PID PPID PGRPID SESSID CMD_NAME "
- + "VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE\n"));
+ + "USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) "
+ + "RSSMEM_USAGE(PAGES) FULL_CMD_LINE\n"));
for (ProcessInfo p : processTree.values()) {
if (p != null) {
ret.append(String.format(PROCESSTREE_DUMP_FORMAT, p.getPid(), p
.getPpid(), p.getPgrpId(), p.getSessionId(), p.getName(), p
- .getVmem(), p.getRssmemPage(), p.getCmdLine(procfsDir)));
+ .getUtime(), p.getStime(), p.getVmem(), p.getRssmemPage(), p
+ .getCmdLine(procfsDir)));
}
}
return ret.toString();
@@ -412,6 +433,27 @@
return totalPages * PAGE_SIZE; // convert # pages to byte
}
+ /**
+ * Get the CPU time in millisecond used by all the processes in the
+ * process-tree since the process-tree created
+ *
+ * @return cumulative CPU time in millisecond since the process-tree created
+ * return 0 if it cannot be calculated
+ */
+ public long getCumulativeCpuTime() {
+ if (JIFFY_LENGTH_IN_MILLIS < 0) {
+ return 0;
+ }
+ long incJiffies = 0;
+ for (ProcessInfo p : processTree.values()) {
+ if (p != null) {
+ incJiffies += p.dtime;
+ }
+ }
+ cpuTime += incJiffies * JIFFY_LENGTH_IN_MILLIS;
+ return cpuTime;
+ }
+
private static Integer getValidPID(String pid) {
Integer retPid = -1;
try {
@@ -481,12 +523,12 @@
Matcher m = PROCFS_STAT_FILE_FORMAT.matcher(str);
boolean mat = m.find();
if (mat) {
- // Set ( name ) ( ppid ) ( pgrpId ) (session ) (vsize ) (rss)
- pinfo.updateProcessInfo(m.group(2), Integer.parseInt(m.group(3)), Integer
- .parseInt(m.group(4)), Integer.parseInt(m.group(5)), Long
- .parseLong(m.group(7)), Long.parseLong(m.group(8)));
- }
- else {
+ // Set (name) (ppid) (pgrpId) (session) (utime) (stime) (vsize) (rss)
+ pinfo.updateProcessInfo(m.group(2), Integer.parseInt(m.group(3)),
+ Integer.parseInt(m.group(4)), Integer.parseInt(m.group(5)),
+ Long.parseLong(m.group(7)), Long.parseLong(m.group(8)),
+ Long.parseLong(m.group(10)), Long.parseLong(m.group(11)));
+ } else {
LOG.warn("Unexpected: procfs stat file is not in the expected format"
+ " for process with pid " + pinfo.getPid());
ret = null;
@@ -536,8 +578,17 @@
private Integer sessionId; // session-id
private Long vmem; // virtual memory usage
private Long rssmemPage; // rss memory usage in # of pages
+ private Long utime = 0L; // # of jiffies in user mode
+ private Long stime = 0L; // # of jiffies in kernel mode
// how many times has this process been seen alive
private int age;
+
+ // # of jiffies used since last update:
+ private Long dtime = 0L;
+ // dtime = (utime + stime) - (utimeOld + stimeOld)
+ // We need this to compute the cumulative CPU time
+ // because the subprocess may finish earlier than root process
+
private List<ProcessInfo> children = new ArrayList<ProcessInfo>(); // list of children
public ProcessInfo(int pid) {
@@ -570,6 +621,18 @@
return vmem;
}
+ public Long getUtime() {
+ return utime;
+ }
+
+ public Long getStime() {
+ return stime;
+ }
+
+ public Long getDtime() {
+ return dtime;
+ }
+
public Long getRssmemPage() { // get rss # of pages
return rssmemPage;
}
@@ -586,15 +649,22 @@
}
public void updateProcessInfo(String name, Integer ppid, Integer pgrpId,
- Integer sessionId, Long vmem, Long rssmem) {
+ Integer sessionId, Long utime, Long stime, Long vmem, Long rssmem) {
this.name = name;
this.ppid = ppid;
this.pgrpId = pgrpId;
this.sessionId = sessionId;
+ this.utime = utime;
+ this.stime = stime;
this.vmem = vmem;
this.rssmemPage = rssmem;
}
+ public void updateJiffy(ProcessInfo oldInfo) {
+ this.dtime = (oldInfo == null ? this.utime + this.stime
+ : (this.utime + this.stime) - (oldInfo.utime + oldInfo.stime));
+ }
+
public void updateAge(ProcessInfo oldInfo) {
this.age = oldInfo.age + 1;
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/checkstyle-noframes-sorted.xsl
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/checkstyle-noframes-sorted.xsl?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/checkstyle-noframes-sorted.xsl (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/checkstyle-noframes-sorted.xsl Tue Jan 26 14:02:53 2010
@@ -1,4 +1,21 @@
<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
<xsl:output method="html" indent="yes"/>
<xsl:decimal-format decimal-separator="." grouping-separator="," />
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/checkstyle.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/checkstyle.xml?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/checkstyle.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/checkstyle.xml Tue Jan 26 14:02:53 2010
@@ -1,4 +1,21 @@
<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
<!DOCTYPE module PUBLIC
"-//Puppy Crawl//DTD Check Configuration 1.2//EN"
"http://www.puppycrawl.com/dtds/configuration_1_2.dtd">
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/findbugsExcludeFile.xml?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/findbugsExcludeFile.xml Tue Jan 26 14:02:53 2010
@@ -1,3 +1,20 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
<FindBugsFilter>
<Match>
<Package name="org.apache.hadoop.record.compiler.generated" />
@@ -43,6 +60,10 @@
<Class name="org.apache.hadoop.mapred.taskdetails_jsp" />
<Bug code="HRS" />
</Match>
+ <Match>
+ <Class name="org.apache.hadoop.mapred.jobdetails_jsp"/>
+ <Bug pattern="HRS_REQUEST_PARAMETER_TO_HTTP_HEADER"/>
+ </Match>
<!--
Ignore warnings where child class has the same name as
super class. Classes based on Old API shadow names from
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/hadoop-policy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/hadoop-policy.xml?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/hadoop-policy.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/hadoop-policy.xml Tue Jan 26 14:02:53 2010
@@ -1,4 +1,21 @@
<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- Put site-specific property overrides in this file. -->
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/log4j.properties
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/log4j.properties?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/log4j.properties (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/log4j.properties Tue Jan 26 14:02:53 2010
@@ -1,3 +1,15 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
# log4j configuration used during build and unit tests
log4j.rootLogger=info,stdout
Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jan 26 14:02:53 2010
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/test/mapred:713112
/hadoop/core/trunk/src/test/mapred:776175-785643
-/hadoop/mapreduce/trunk/src/test/mapred:804974-885774
+/hadoop/mapreduce/trunk/src/test/mapred:804974-903221
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred-site.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred-site.xml?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred-site.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred-site.xml Tue Jan 26 14:02:53 2010
@@ -1,4 +1,21 @@
<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- Put site-specific property overrides in this file. -->
@@ -19,4 +36,9 @@
<value>false</value>
<description></description>
</property>
+<property>
+ <name>mapreduce.jobtracker.staging.root.dir</name>
+ <value>${hadoop.tmp.dir}/staging</value>
+ <description></description>
+</property>
</configuration>
Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/fs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jan 26 14:02:53 2010
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/test/mapred/org/apache/hadoop/fs:713112
/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/fs:776175-785643
-/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs:807679-885774
+/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs:807679-903221
Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jan 26 14:02:53 2010
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/test/mapred/org/apache/hadoop/hdfs:713112
/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/hdfs:776175-785643
-/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/hdfs:807679-885774
+/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/hdfs:807679-903221
Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/io/FileBench.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jan 26 14:02:53 2010
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/test/mapred/org/apache/hadoop/io/FileBench.java:713112
/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/io/FileBench.java:776175-785643
-/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/io/FileBench.java:817878-835934,884917-885774
+/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/io/FileBench.java:817878-835934,884917-903221
Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/io/TestSequenceFileMergeProgress.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jan 26 14:02:53 2010
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/test/mapred/org/apache/hadoop/io/TestSequenceFileMergeProgress.java:713112
/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/io/TestSequenceFileMergeProgress.java:776175-785643
-/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/io/TestSequenceFileMergeProgress.java:817878-835934,884917-885774
+/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/io/TestSequenceFileMergeProgress.java:817878-835934,884917-903221
Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/ipc/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jan 26 14:02:53 2010
@@ -1,4 +1,4 @@
/hadoop/core/branches/branch-0.19/hdfs/src/test/hdfs-with-mr/org/apache/hadoop/ipc:713112
/hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/ipc:776175-784663
/hadoop/hdfs/branches/HDFS-265/src/test/hdfs-with-mr/org/apache/hadoop/ipc:796829-820463
-/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/ipc:807679-885774
+/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/ipc:807679-903221
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java Tue Jan 26 14:02:53 2010
@@ -28,11 +28,14 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
import junit.framework.TestCase;
@@ -48,7 +51,10 @@
* <li>Make the built binary to setuid executable</li>
* <li>Execute following targets:
* <code>ant test -Dcompile.c++=true -Dtaskcontroller-path=<em>path to built binary</em>
- * -Dtaskcontroller-ugi=<em>user,group</em></code></li>
+ * -Dtaskcontroller-ugi=<em>user,group</em></code>
+ * <br/>(Note that "path to built binary" means the directory containing task-controller -
+ * not the actual complete path of the binary itself. This path must end in ".../bin")
+ * </li>
* </ol>
*
*/
@@ -72,6 +78,24 @@
void setTaskControllerExe(String execPath) {
this.taskControllerExePath = execPath;
}
+
+ volatile static int attemptedSigQuits = 0;
+ volatile static int failedSigQuits = 0;
+
+ /** Work like LinuxTaskController, but also count the number of
+ * attempted and failed SIGQUIT sends via the task-controller
+ * executable.
+ */
+ @Override
+ void dumpTaskStack(TaskControllerContext context) {
+ attemptedSigQuits++;
+ try {
+ signalTask(context, TaskCommands.SIGQUIT_TASK_JVM);
+ } catch (Exception e) {
+ LOG.warn("Execution sending SIGQUIT: " + StringUtils.stringifyException(e));
+ failedSigQuits++;
+ }
+ }
}
// cluster instances which sub classes can use
@@ -120,10 +144,10 @@
String[] splits = ugi.split(",");
taskControllerUser = new UnixUserGroupInformation(splits);
clusterConf.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
- createHomeDirectory(clusterConf);
+ createHomeAndStagingDirectory(clusterConf);
}
- private void createHomeDirectory(JobConf conf)
+ private void createHomeAndStagingDirectory(JobConf conf)
throws IOException {
FileSystem fs = dfsCluster.getFileSystem();
String path = "/user/" + taskControllerUser.getUserName();
@@ -131,6 +155,10 @@
LOG.info("Creating Home directory : " + homeDirectory);
fs.mkdirs(homeDirectory);
changePermission(conf, homeDirectory);
+ Path stagingArea = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT));
+ LOG.info("Creating Staging root directory : " + stagingArea);
+ fs.mkdirs(stagingArea);
+ fs.setPermission(stagingArea, new FsPermission((short)0777));
}
private void changePermission(JobConf conf, Path p)
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java Tue Jan 26 14:02:53 2010
@@ -29,11 +29,11 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.TaskStatus.Phase;
-import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.Job.RawSplit;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
+import org.apache.hadoop.mapreduce.split.JobSplit;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
/**
* Utilities used in unit test.
@@ -67,8 +67,10 @@
}
@Override
public ClusterStatus getClusterStatus(boolean detailed) {
- return new ClusterStatus(trackers.length,
- 0, 0, 0, 0, totalSlots/2, totalSlots/2, JobTracker.State.RUNNING, 0);
+ return new ClusterStatus(
+ taskTrackers().size() - getBlacklistedTrackerCount(),
+ getBlacklistedTrackerCount(), 0, 0, 0, totalSlots/2, totalSlots/2,
+ JobTracker.State.RUNNING, 0);
}
public void setNumSlots(int totalSlots) {
@@ -77,7 +79,6 @@
}
static class FakeJobInProgress extends JobInProgress {
- Job.RawSplit[] rawSplits;
@SuppressWarnings("deprecation")
FakeJobInProgress(JobConf jobConf, JobTracker tracker) throws IOException {
super(new JobID(jtIdentifier, ++jobCounter), jobConf, tracker);
@@ -91,27 +92,27 @@
@Override
public synchronized void initTasks() throws IOException {
- Job.RawSplit[] splits = createSplits();
- numMapTasks = splits.length;
- createMapTasks(null, splits);
- nonRunningMapCache = createCache(splits, maxLevel);
+ TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(jobId);
+ numMapTasks = taskSplitMetaInfo.length;
+ createMapTasks(null, taskSplitMetaInfo);
+ nonRunningMapCache = createCache(taskSplitMetaInfo, maxLevel);
createReduceTasks(null);
tasksInited.set(true);
this.status.setRunState(JobStatus.RUNNING);
}
@Override
- Job.RawSplit[] createSplits(){
- Job.RawSplit[] splits = new Job.RawSplit[numMapTasks];
+ TaskSplitMetaInfo [] createSplits(org.apache.hadoop.mapreduce.JobID jobId){
+ TaskSplitMetaInfo[] splits =
+ new TaskSplitMetaInfo[numMapTasks];
for (int i = 0; i < numMapTasks; i++) {
- splits[i] = new Job.RawSplit();
- splits[i].setLocations(new String[0]);
+ splits[i] = JobSplit.EMPTY_TASK_SPLIT;
}
return splits;
}
@Override
- protected void createMapTasks(String ignored, Job.RawSplit[] splits) {
+ protected void createMapTasks(String ignored, TaskSplitMetaInfo[] splits) {
maps = new TaskInProgress[numMapTasks];
for (int i = 0; i < numMapTasks; i++) {
maps[i] = new TaskInProgress(getJobID(), "test",
@@ -218,6 +219,15 @@
updateTaskStatus(tip, status);
}
+ public void killTask(TaskAttemptID taskId) {
+ TaskInProgress tip = jobtracker.taskidToTIPMap.get(taskId);
+ TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), taskId,
+ 1.0f, 1, TaskStatus.State.KILLED, "", "", tip
+ .machineWhereTaskRan(taskId), tip.isMapTask() ? Phase.MAP
+ : Phase.REDUCE, new Counters());
+ updateTaskStatus(tip, status);
+ }
+
public void cleanUpMetrics() {
}
@@ -253,7 +263,7 @@
numSlotsRequired);
}
- public FakeTaskInProgress(JobID jobId, String jobFile, RawSplit emptySplit,
+ public FakeTaskInProgress(JobID jobId, String jobFile, TaskSplitMetaInfo emptySplit,
JobTracker jobTracker, JobConf jobConf,
JobInProgress job, int partition, int numSlotsRequired) {
super(jobId, jobFile, emptySplit, jobTracker, jobConf, job,
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/GenericMRLoadGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/GenericMRLoadGenerator.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/GenericMRLoadGenerator.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/GenericMRLoadGenerator.java Tue Jan 26 14:02:53 2010
@@ -141,16 +141,16 @@
org.apache.hadoop.mapreduce.GenericMRLoadGenerator.INDIRECT_INPUT_FORMAT,
null)) {
// specified IndirectInputFormat? Build src list
- JobClient jClient = new JobClient(job);
- Path sysdir = jClient.getSystemDir();
+ JobClient jClient = new JobClient(job);
+ Path tmpDir = new Path(jClient.getFs().getHomeDirectory(), ".staging");
Random r = new Random();
- Path indirInputFile = new Path(sysdir,
+ Path indirInputFile = new Path(tmpDir,
Integer.toString(r.nextInt(Integer.MAX_VALUE), 36) + "_files");
job.set(
org.apache.hadoop.mapreduce.GenericMRLoadGenerator.INDIRECT_INPUT_FILE,
indirInputFile.toString());
SequenceFile.Writer writer = SequenceFile.createWriter(
- sysdir.getFileSystem(job), job, indirInputFile,
+ tmpDir.getFileSystem(job), job, indirInputFile,
LongWritable.class, Text.class,
SequenceFile.CompressionType.NONE);
try {
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java Tue Jan 26 14:02:53 2010
@@ -189,7 +189,7 @@
conf.set(MRConfig.LOCAL_DIR, localPath.toString());
LOG.info(MRConfig.LOCAL_DIR + " is " + localPath);
try {
- tt = new TaskTracker(conf);
+ tt = createTaskTracker(conf);
isInitialized = true;
} catch (Throwable e) {
isDead = true;
@@ -199,6 +199,13 @@
}
/**
+ * Creates a default {@link TaskTracker} using the conf passed.
+ */
+ TaskTracker createTaskTracker(JobConf conf) throws IOException {
+ return new TaskTracker(conf);
+ }
+
+ /**
* Create and run the task tracker.
*/
public void run() {
@@ -268,7 +275,18 @@
public int getNumTaskTrackers() {
return taskTrackerList.size();
}
-
+
+ /**
+ * Sets inline cleanup threads to all task trackers sothat deletion of
+ * temporary files/dirs happen inline
+ */
+ public void setInlineCleanupThreads() {
+ for (int i = 0; i < getNumTaskTrackers(); i++) {
+ getTaskTrackerRunner(i).getTaskTracker().setCleanupThread(
+ new UtilsForTests.InlineCleanupQueue());
+ }
+ }
+
/**
* Wait until the system is idle.
*/
@@ -671,6 +689,13 @@
TaskTrackerRunner taskTracker;
taskTracker = new TaskTrackerRunner(idx, numDir, host, conf);
+ addTaskTracker(taskTracker);
+ }
+
+ /**
+ * Add a task-tracker to the Mini-MR cluster.
+ */
+ void addTaskTracker(TaskTrackerRunner taskTracker) {
Thread taskTrackerThread = new Thread(taskTracker);
taskTrackerList.add(taskTracker);
taskTrackerThreadList.add(taskTrackerThread);
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/NotificationTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/NotificationTestCase.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/NotificationTestCase.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/NotificationTestCase.java Tue Jan 26 14:02:53 2010
@@ -32,7 +32,6 @@
import javax.servlet.ServletException;
import java.io.IOException;
import java.io.DataOutputStream;
-import java.util.Date;
/**
* Base class to test Job end notification in local and cluster mode.
@@ -51,17 +50,12 @@
*/
public abstract class NotificationTestCase extends HadoopTestCase {
- private static void stdPrintln(String s) {
- //System.out.println(s);
- }
-
protected NotificationTestCase(int mode) throws IOException {
super(mode, HadoopTestCase.LOCAL_FS, 1, 1);
}
private int port;
private String contextPath = "/notification";
- private Class servletClass = NotificationServlet.class;
private String servletPath = "/mapred";
private Server webServer;
@@ -118,15 +112,9 @@
break;
}
if (counter % 2 == 0) {
- stdPrintln((new Date()).toString() +
- "Receiving First notification for [" + req.getQueryString() +
- "], returning error");
res.sendError(HttpServletResponse.SC_BAD_REQUEST, "forcing error");
}
else {
- stdPrintln((new Date()).toString() +
- "Receiving Second notification for [" + req.getQueryString() +
- "], returning OK");
res.setStatus(HttpServletResponse.SC_OK);
}
counter++;
@@ -160,10 +148,7 @@
public void testMR() throws Exception {
System.out.println(launchWordCount(this.createJobConf(),
"a b c d e f g h", 1, 1));
- synchronized(Thread.currentThread()) {
- stdPrintln("Sleeping for 2 seconds to give time for retry");
- Thread.currentThread().sleep(2000);
- }
+ Thread.sleep(2000);
assertEquals(2, NotificationServlet.counter);
Path inDir = new Path("notificationjob/input");
@@ -180,19 +165,13 @@
// run a job with KILLED status
System.out.println(UtilsForTests.runJobKill(this.createJobConf(), inDir,
outDir).getID());
- synchronized(Thread.currentThread()) {
- stdPrintln("Sleeping for 2 seconds to give time for retry");
- Thread.currentThread().sleep(2000);
- }
+ Thread.sleep(2000);
assertEquals(4, NotificationServlet.counter);
// run a job with FAILED status
System.out.println(UtilsForTests.runJobFail(this.createJobConf(), inDir,
outDir).getID());
- synchronized(Thread.currentThread()) {
- stdPrintln("Sleeping for 2 seconds to give time for retry");
- Thread.currentThread().sleep(2000);
- }
+ Thread.sleep(2000);
assertEquals(6, NotificationServlet.counter);
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestFieldSelection.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestFieldSelection.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestFieldSelection.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestFieldSelection.java Tue Jan 26 14:02:53 2010
@@ -20,6 +20,7 @@
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.lib.*;
+import org.apache.hadoop.mapreduce.lib.fieldsel.FieldSelectionHelper;
import org.apache.hadoop.mapreduce.lib.fieldsel.TestMRFieldSelection;
import junit.framework.TestCase;
@@ -72,9 +73,9 @@
job.setOutputFormat(TextOutputFormat.class);
job.setNumReduceTasks(1);
- job.set("mapreduce.fieldsel.data.field.separator", "-");
- job.set("mapreduce.fieldsel.mapreduce.fieldsel.map.output.key.value.fields.spec", "6,5,1-3:0-");
- job.set("mapreduce.fieldsel.mapreduce.fieldsel.reduce.output.key.value.fields.spec", ":4,3,2,1,0,0-");
+ job.set(FieldSelectionHelper.DATA_FIELD_SEPERATOR, "-");
+ job.set(FieldSelectionHelper.MAP_OUTPUT_KEY_VALUE_SPEC, "6,5,1-3:0-");
+ job.set(FieldSelectionHelper.REDUCE_OUTPUT_KEY_VALUE_SPEC, ":4,3,2,1,0,0-");
JobClient.runJob(job);
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java Tue Jan 26 14:02:53 2010
@@ -58,6 +58,9 @@
namenode = fileSys.getUri().toString();
mr = new MiniMRCluster(10, namenode, 3,
null, null, mrConf);
+ // make cleanup inline sothat validation of existence of these directories
+ // can be done
+ mr.setInlineCleanupThreads();
final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
JobConf jobConf = mr.createJobConf();
runSleepJob(jobConf);
@@ -78,19 +81,8 @@
"/taskTracker/jobcache";
File jobDir = new File(jobDirStr);
String[] contents = jobDir.list();
- if (contents == null || contents.length == 0) {
- return;
- }
- while (contents.length > 0) {
- try {
- Thread.sleep(1000);
- LOG.warn(jobDir +" not empty yet, contents are");
- for (String s: contents) {
- LOG.info(s);
- }
- contents = jobDir.list();
- } catch (InterruptedException ie){}
- }
+ assertTrue("Contents of " + jobDir + " not cleanup.",
+ (contents == null || contents.length == 0));
}
}
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java Tue Jan 26 14:02:53 2010
@@ -22,6 +22,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.SleepJob;
import org.apache.hadoop.util.ToolRunner;
@@ -92,4 +93,30 @@
}
}
}
+
+ /** Ensure that SIGQUIT can be properly sent by the LinuxTaskController
+ * if a task times out.
+ */
+ public void testTimeoutStackTrace() throws Exception {
+ if (!shouldRun()) {
+ return;
+ }
+
+ // Run a job that should timeout and trigger a SIGQUIT.
+ startCluster();
+ JobConf conf = getClusterConf();
+ conf.setInt(JobContext.TASK_TIMEOUT, 10000);
+ conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 50);
+ SleepJob sleepJob = new SleepJob();
+ sleepJob.setConf(conf);
+ Job job = sleepJob.createJob(1, 0, 30000, 1, 0, 0);
+ job.setMaxMapAttempts(1);
+ int prevNumSigQuits = MyLinuxTaskController.attemptedSigQuits;
+ job.waitForCompletion(true);
+ assertTrue("Did not detect a new SIGQUIT!",
+ prevNumSigQuits < MyLinuxTaskController.attemptedSigQuits);
+ assertEquals("A SIGQUIT attempt failed!", 0,
+ MyLinuxTaskController.failedSigQuits);
+
+ }
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java Tue Jan 26 14:02:53 2010
@@ -40,10 +40,9 @@
import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
import org.apache.hadoop.mapred.TaskStatus.Phase;
import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
-import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
-import org.apache.hadoop.mapreduce.Job.RawSplit;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.StaticMapping;
@@ -101,16 +100,14 @@
}
@Override
- Job.RawSplit[] createSplits() {
+ TaskSplitMetaInfo[] createSplits(org.apache.hadoop.mapreduce.JobID jobId) {
// Set all splits to reside on one host. This will ensure that
// one tracker gets data local, one gets rack local and two others
// get non-local maps
- Job.RawSplit[] splits = new Job.RawSplit[numMapTasks];
+ TaskSplitMetaInfo[] splits = new TaskSplitMetaInfo[numMapTasks];
String[] splitHosts0 = new String[] { hosts[0] };
for (int i = 0; i < numMapTasks; i++) {
- splits[i] = new Job.RawSplit();
- splits[i].setDataLength(0);
- splits[i].setLocations(splitHosts0);
+ splits[i] = new TaskSplitMetaInfo(splitHosts0, 0, 0);
}
return splits;
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java Tue Jan 26 14:02:53 2010
@@ -18,12 +18,20 @@
package org.apache.hadoop.mapred;
+import java.io.BufferedReader;
+import java.io.FileInputStream;
import java.io.File;
+import java.io.InputStreamReader;
import java.io.IOException;
import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.SleepJob;
/**
* A JUnit test to test Kill Job & Fail Job functionality with local file
@@ -31,38 +39,96 @@
*/
public class TestJobKillAndFail extends TestCase {
+ static final Log LOG = LogFactory.getLog(TestJobKillAndFail.class);
+
private static String TEST_ROOT_DIR = new File(System.getProperty(
"test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
- public void testJobFailAndKill() throws IOException {
+ /**
+ * TaskController instance that just sets a flag when a stack dump
+ * is performed in a child thread.
+ */
+ static class MockStackDumpTaskController extends DefaultTaskController {
+
+ static volatile int numStackDumps = 0;
+
+ static final Log LOG = LogFactory.getLog(TestJobKillAndFail.class);
+
+ public MockStackDumpTaskController() {
+ LOG.info("Instantiated MockStackDumpTC");
+ }
+
+ @Override
+ void dumpTaskStack(TaskControllerContext context) {
+ LOG.info("Got stack-dump request in TaskController");
+ MockStackDumpTaskController.numStackDumps++;
+ super.dumpTaskStack(context);
+ }
+
+ }
+
+ /** If a task was killed, then dumpTaskStack() should have been
+ * called. Test whether or not the counter was incremented
+ * and succeed/fail based on this. */
+ private void checkForStackDump(boolean expectDump, int lastNumDumps) {
+ int curNumDumps = MockStackDumpTaskController.numStackDumps;
+
+ LOG.info("curNumDumps=" + curNumDumps + "; lastNumDumps=" + lastNumDumps
+ + "; expect=" + expectDump);
+
+ if (expectDump) {
+ assertTrue("No stack dump recorded!", lastNumDumps < curNumDumps);
+ } else {
+ assertTrue("Stack dump happened anyway!", lastNumDumps == curNumDumps);
+ }
+ }
+
+ public void testJobFailAndKill() throws Exception {
MiniMRCluster mr = null;
try {
JobConf jtConf = new JobConf();
jtConf.set("mapred.jobtracker.instrumentation",
JTInstrumentation.class.getName());
+ jtConf.set("mapreduce.tasktracker.taskcontroller",
+ MockStackDumpTaskController.class.getName());
mr = new MiniMRCluster(2, "file:///", 3, null, null, jtConf);
JTInstrumentation instr = (JTInstrumentation)
mr.getJobTrackerRunner().getJobTracker().getInstrumentation();
// run the TCs
JobConf conf = mr.createJobConf();
+ conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 50);
Path inDir = new Path(TEST_ROOT_DIR + "/failkilljob/input");
Path outDir = new Path(TEST_ROOT_DIR + "/failkilljob/output");
- RunningJob job = UtilsForTests.runJobFail(conf, inDir, outDir);
+ RunningJob runningJob = UtilsForTests.runJobFail(conf, inDir, outDir);
// Checking that the Job got failed
- assertEquals(job.getJobState(), JobStatus.FAILED);
+ assertEquals(runningJob.getJobState(), JobStatus.FAILED);
assertTrue(instr.verifyJob());
assertEquals(1, instr.failed);
instr.reset();
-
- job = UtilsForTests.runJobKill(conf, inDir, outDir);
+ int prevNumDumps = MockStackDumpTaskController.numStackDumps;
+ runningJob = UtilsForTests.runJobKill(conf, inDir, outDir);
// Checking that the Job got killed
- assertTrue(job.isComplete());
- assertEquals(job.getJobState(), JobStatus.KILLED);
+ assertTrue(runningJob.isComplete());
+ assertEquals(runningJob.getJobState(), JobStatus.KILLED);
assertTrue(instr.verifyJob());
assertEquals(1, instr.killed);
+ // check that job kill does not put a stacktrace in task logs.
+ checkForStackDump(false, prevNumDumps);
+
+ // Test that a task that times out does have a stack trace
+ conf = mr.createJobConf();
+ conf.setInt(JobContext.TASK_TIMEOUT, 10000);
+ conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 50);
+ SleepJob sleepJob = new SleepJob();
+ sleepJob.setConf(conf);
+ Job job = sleepJob.createJob(1, 0, 30000, 1,0, 0);
+ job.setMaxMapAttempts(1);
+ prevNumDumps = MockStackDumpTaskController.numStackDumps;
+ job.waitForCompletion(true);
+ checkForStackDump(true, prevNumDumps);
} finally {
if (mr != null) {
mr.shutdown();
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueClient.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueClient.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueClient.java Tue Jan 26 14:02:53 2010
@@ -17,20 +17,33 @@
*/
package org.apache.hadoop.mapred;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.CONFIG;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.checkForConfigFile;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.createDocument;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.createSimpleDocumentWithAcls;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.miniMRCluster;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.setUpCluster;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.writeToFile;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.File;
+import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
-import org.junit.After;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.QueueInfo;
import org.junit.Test;
+import org.w3c.dom.Document;
public class TestJobQueueClient {
@Test
public void testQueueOrdering() throws Exception {
- System.out.println("in test queue ordering");
// create some sample queues in a hierarchy..
JobQueueInfo[] roots = new JobQueueInfo[2];
roots[0] = new JobQueueInfo("q1", "q1 scheduling info");
@@ -53,7 +66,6 @@
@Test
public void testQueueInfoPrinting() throws Exception {
- System.out.println("in test queue info printing");
// create a test queue with children.
// create some sample queues in a hierarchy..
JobQueueInfo root = new JobQueueInfo("q1", "q1 scheduling info");
@@ -76,4 +88,24 @@
assertEquals(sb.toString(), writer.toString());
}
-}
\ No newline at end of file
+
+ @Test
+ public void testGetQueue() throws Exception {
+ checkForConfigFile();
+ Document doc = createDocument();
+ createSimpleDocumentWithAcls(doc, "true");
+ writeToFile(doc, CONFIG);
+ Configuration conf = new Configuration();
+ conf.addResource(CONFIG);
+ setUpCluster(conf);
+ JobClient jc = new JobClient(miniMRCluster.createJobConf());
+ // test for existing queue
+ QueueInfo queueInfo = jc.getQueueInfo("q1");
+ assertEquals("q1",queueInfo.getQueueName());
+ // try getting a non-existing queue
+ queueInfo = jc.getQueueInfo("queue");
+ assertNull(queueInfo);
+
+ new File(CONFIG).delete();
+ }
+}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Tue Jan 26 14:02:53 2010
@@ -30,6 +30,7 @@
import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.mapreduce.split.JobSplit;
public class TestJobQueueTaskScheduler extends TestCase {
@@ -81,7 +82,7 @@
public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
int ignored) throws IOException {
TaskAttemptID attemptId = getTaskAttemptID(TaskType.MAP);
- Task task = new MapTask("", attemptId, 0, "", new BytesWritable(), 1) {
+ Task task = new MapTask("", attemptId, 0, new JobSplit.TaskSplitIndex(), 1) {
@Override
public String toString() {
return String.format("%s on %s", getTaskID(), tts.getTrackerName());
|