diff --git a/bin/start-all.sh b/bin/start-all.sh index 88ce430..c545974 100755 --- a/bin/start-all.sh +++ b/bin/start-all.sh @@ -28,7 +28,7 @@ else fi # start dfs daemons -"$bin"/start-dfs.sh --config $HADOOP_CONF_DIR +# "$bin"/start-dfs.sh --config $HADOOP_CONF_DIR # start mapred daemons "$bin"/start-mapred.sh --config $HADOOP_CONF_DIR diff --git a/conf/log4j.properties b/conf/log4j.properties index 1bac90d..2658c37 100644 --- a/conf/log4j.properties +++ b/conf/log4j.properties @@ -1,5 +1,5 @@ # Define some default values that can be overridden by system properties -hadoop.root.logger=INFO,console +hadoop.root.logger=DEBUG,console hadoop.log.dir=. hadoop.log.file=hadoop.log @@ -107,9 +107,11 @@ log4j.logger.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=WARN hadoop.metrics.log.level=INFO #log4j.logger.org.apache.hadoop.mapred.JobTracker=DEBUG -#log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG +log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG +log4j.logger.org.apache.hadoop.mapred.ReduceTask=DEBUG #log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG log4j.logger.org.apache.hadoop.metrics2=${hadoop.metrics.log.level} +log4j.logger.org.apache.hadoop.mapred.Child=DEBUG # Jets3t library log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR diff --git a/conf/taskcontroller.cfg b/conf/taskcontroller.cfg index a13aeb9..ef85fa9 100644 --- a/conf/taskcontroller.cfg +++ b/conf/taskcontroller.cfg @@ -1,4 +1,11 @@ -mapred.local.dir=#configured value of mapred.local.dir. It can be a list of comma separated paths. -hadoop.log.dir=#configured value of hadoop.log.dir. -mapred.tasktracker.tasks.sleeptime-before-sigkill=#sleep time before sig kill is to be sent to process group after sigterm is sent. Should be in seconds -mapreduce.tasktracker.group=#configured value of mapreduce.tasktracker.group. +#configured value of mapred.local.dir. It can be a list of comma separated paths. +mapred.local.dir=/mnt/lustre/hadoop-tmpdir + +#configured value of hadoop.log.dir. +hadoop.log.dir=/work/lustre/tasks/Hadoop.update/git/hadoop-common/logs + +#sleep time before sig kill is to be sent to process group after sigterm is sent. Should be in seconds +mapred.tasktracker.tasks.sleeptime-before-sigkill=30 + +#configured value of mapreduce.tasktracker.group. +mapreduce.tasktracker.group=0 diff --git a/src/mapred/org/apache/hadoop/mapred/ReduceTask.java b/src/mapred/org/apache/hadoop/mapred/ReduceTask.java index 980b988..076ad08 100644 --- a/src/mapred/org/apache/hadoop/mapred/ReduceTask.java +++ b/src/mapred/org/apache/hadoop/mapred/ReduceTask.java @@ -1481,10 +1481,10 @@ class ReduceTask extends Task { URLConnection connection = url.openConnection(); InputStream input = setupSecureConnection(mapOutputLoc, connection); - - // Validate header from map output + + // Validate header from map output TaskAttemptID mapId = null; - try { + try { mapId = TaskAttemptID.forName(connection.getHeaderField(FROM_MAP_TASK)); } catch (IllegalArgumentException ia) { @@ -1524,6 +1524,23 @@ class ReduceTask extends Task { LOG.debug("header: " + mapId + ", compressed len: " + compressedLength + ", decompressed len: " + decompressedLength); } + + + // Shuffle + MapOutput mapOutput = null; + + boolean shuffleLink = conf.getBoolean("mapreduce.shuffle.link", false); + + if (shuffleLink) { + if (LOG.isDebugEnabled()) { + LOG.debug("Shuffling " + decompressedLength + " bytes (" + + compressedLength + " raw bytes) " + + "to Link from " + mapOutputLoc.getTaskAttemptId()); + } + mapOutput = shuffleToLink(mapOutputLoc, input, filename, + (int) decompressedLength); + return mapOutput; + } //We will put a file in memory if it meets certain criteria: //1. The size of the (decompressed) file should be less than 25% of @@ -1533,8 +1550,6 @@ class ReduceTask extends Task { // Check if this map-output can be saved in-memory boolean shuffleInMemory = ramManager.canFitInMemory(decompressedLength); - // Shuffle - MapOutput mapOutput = null; if (shuffleInMemory) { if (LOG.isDebugEnabled()) { LOG.debug("Shuffling " + decompressedLength + " bytes (" + @@ -1772,6 +1787,48 @@ class ReduceTask extends Task { return mapOutput; } + private MapOutput shuffleToLink(MapOutputLocation mapOutputLoc, + InputStream input, + Path filename, + long mapOutputLength) + throws IOException { + // Find out a suitable location for the output on local-filesystem + Path localFilename = + lDirAlloc.getLocalPathForWrite(filename.toUri().getPath(), + mapOutputLength, conf); + + LOG.error("shuffleToDisk: local file = \"" + localFilename.toString() + "\""); + + MapOutput mapOutput = + new MapOutput(mapOutputLoc.getTaskId(), mapOutputLoc.getTaskAttemptId(), + conf, localFileSys.makeQualified(localFilename), + mapOutputLength); + + String query = mapOutputLoc.getOutputLocation().getQuery(); + LOG.error("shuffleToDisk: query = \"" + query + "\""); + + + String[] qureies=query.split("&"); + String maphost=mapOutputLoc.getHost(); + String tmpDir = conf.get("hadoop.tmp.dir"); + String lnCmd = conf.get("hadoop.ln.cmd"); + + String src=tmpDir+"/"+"/taskTracker/root/jobcache/" + +qureies[0].substring(qureies[0].indexOf('=')+1)+"/" + +qureies[1].substring(qureies[1].indexOf('=')+1)+"/output/file.out"; + + String command = lnCmd + " "+src+" "+localFilename; + try { + LOG.debug("shuffleToLink: Command used for hardlink "+command); + Runtime.getRuntime().exec(command).waitFor(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + return mapOutput; + } + private MapOutput shuffleToDisk(MapOutputLocation mapOutputLoc, InputStream input, Path filename, diff --git a/src/mapred/org/apache/hadoop/mapred/TaskRunner.java b/src/mapred/org/apache/hadoop/mapred/TaskRunner.java index ca54508..c9a9677 100644 --- a/src/mapred/org/apache/hadoop/mapred/TaskRunner.java +++ b/src/mapred/org/apache/hadoop/mapred/TaskRunner.java @@ -248,6 +248,7 @@ abstract class TaskRunner extends Thread { } setupCmds.add(setup); + LOG.warn("JVM to start: " + setupCmds.toString() + "; workdir = " + workDir); launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir); tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID()); if (exitCodeSet) {