Return-Path: X-Original-To: apmail-hadoop-mapreduce-dev-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BBD4690E9 for ; Wed, 16 May 2012 08:51:45 +0000 (UTC) Received: (qmail 31487 invoked by uid 500); 16 May 2012 08:51:45 -0000 Delivered-To: apmail-hadoop-mapreduce-dev-archive@hadoop.apache.org Received: (qmail 31271 invoked by uid 500); 16 May 2012 08:51:43 -0000 Mailing-List: contact mapreduce-dev-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-dev@hadoop.apache.org Received: (qmail 31255 invoked by uid 99); 16 May 2012 08:51:43 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 May 2012 08:51:43 +0000 X-ASF-Spam-Status: No, hits=-2.3 required=5.0 tests=RCVD_IN_DNSWL_MED,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of Alexander_Zarochentsev@xyratex.com designates 207.126.144.145 as permitted sender) Received: from [207.126.144.145] (HELO eu1sys200aog118.obsmtp.com) (207.126.144.145) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 May 2012 08:51:36 +0000 Received: from XYUS-EX21.xyus.xyratex.com ([207.104.172.71]) (using TLSv1) by eu1sys200aob118.postini.com ([207.126.147.11]) with SMTP ID DSNKT7NqgMBwKvjAUv05FXLteJF2ROYwq6j2@postini.com; Wed, 16 May 2012 08:51:15 UTC Received: from TigraPro.local ([10.0.30.69]) by XYUS-EX21.xyus.xyratex.com with Microsoft SMTPSVC(6.0.3790.4675); Wed, 16 May 2012 01:34:30 -0700 Message-ID: <4FB36694.3060705@xyratex.com> Date: Wed, 16 May 2012 12:34:28 +0400 From: Alexander Zarochentsev User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.6; rv:12.0) Gecko/20120428 Thunderbird/12.0.1 MIME-Version: 1.0 To: mapreduce-dev@hadoop.apache.org, Nathan Rutman Subject: Hadoop optimization for Lustre FS Content-Type: multipart/mixed; boundary="------------080909000201000906000708" X-OriginalArrivalTime: 16 May 2012 08:34:31.0217 (UTC) FILETIME=[B6D76210:01CD333E] X-Virus-Checked: Checked by ClamAV on apache.org --------------080909000201000906000708 Content-Type: text/plain; charset=ISO-8859-1; format=flowed Content-Transfer-Encoding: 7bit Hello, there is an optimization for Hadoop on Lustre FS, or any high-performance distributed filesystem. The research paper with test results can be found here http://www.xyratex.com/pdfs/whitepapers/Xyratex_white_paper_MapReduce_1-4.pdf and a presentation for LUG 2011: http://www.olcf.ornl.gov/wp-content/events/lug2011/4-12-2011/1100-1130_Nathan_Rutman_MapReduce_Lug_2011.pptx Basically the optimization is a replacement for http transport in shuffle phase by simple linking target file to the source one. I attached a draft patch against hadoop-1.0.0 to illustrate the idea. How to push this patch upstream? Thanks, -- Alexander "Zam" Zarochentsev alexander_zarochentsev@xyratex.com ______________________________________________________________________ This email may contain privileged or confidential information, which should only be used for the purpose for which it was sent by Xyratex. No further rights or licenses are granted to use such information. If you are not the intended recipient of this message, please notify the sender by return and delete it. You may not use, copy, disclose or rely on the information contained in it. Internet email is susceptible to data corruption, interception and unauthorised amendment for which Xyratex does not accept liability. While we have taken reasonable precautions to ensure that this email is free of viruses, Xyratex does not accept liability for the presence of any computer viruses in this email, nor for any losses caused as a result of viruses. Xyratex Technology Limited (03134912), Registered in England & Wales, Registered Office, Langstone Road, Havant, Hampshire, PO9 1SA. The Xyratex group of companies also includes, Xyratex Ltd, registered in Bermuda, Xyratex International Inc, registered in California, Xyratex (Malaysia) Sdn Bhd registered in Malaysia, Xyratex Technology (Wuxi) Co Ltd registered in The People's Republic of China and Xyratex Japan Limited registered in Japan. ______________________________________________________________________ --------------080909000201000906000708 Content-Type: text/plain; charset=UTF-8; x-mac-type="0"; x-mac-creator="0"; name="hardlink-2012.05.15.patch" Content-Transfer-Encoding: 7bit Content-Disposition: attachment; filename="hardlink-2012.05.15.patch" diff --git a/bin/start-all.sh b/bin/start-all.sh index 88ce430..c545974 100755 --- a/bin/start-all.sh +++ b/bin/start-all.sh @@ -28,7 +28,7 @@ else fi # start dfs daemons -"$bin"/start-dfs.sh --config $HADOOP_CONF_DIR +# "$bin"/start-dfs.sh --config $HADOOP_CONF_DIR # start mapred daemons "$bin"/start-mapred.sh --config $HADOOP_CONF_DIR diff --git a/conf/log4j.properties b/conf/log4j.properties index 1bac90d..2658c37 100644 --- a/conf/log4j.properties +++ b/conf/log4j.properties @@ -1,5 +1,5 @@ # Define some default values that can be overridden by system properties -hadoop.root.logger=INFO,console +hadoop.root.logger=DEBUG,console hadoop.log.dir=. hadoop.log.file=hadoop.log @@ -107,9 +107,11 @@ log4j.logger.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=WARN hadoop.metrics.log.level=INFO #log4j.logger.org.apache.hadoop.mapred.JobTracker=DEBUG -#log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG +log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG +log4j.logger.org.apache.hadoop.mapred.ReduceTask=DEBUG #log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG log4j.logger.org.apache.hadoop.metrics2=${hadoop.metrics.log.level} +log4j.logger.org.apache.hadoop.mapred.Child=DEBUG # Jets3t library log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR diff --git a/conf/taskcontroller.cfg b/conf/taskcontroller.cfg index a13aeb9..ef85fa9 100644 --- a/conf/taskcontroller.cfg +++ b/conf/taskcontroller.cfg @@ -1,4 +1,11 @@ -mapred.local.dir=#configured value of mapred.local.dir. It can be a list of comma separated paths. -hadoop.log.dir=#configured value of hadoop.log.dir. -mapred.tasktracker.tasks.sleeptime-before-sigkill=#sleep time before sig kill is to be sent to process group after sigterm is sent. Should be in seconds -mapreduce.tasktracker.group=#configured value of mapreduce.tasktracker.group. +#configured value of mapred.local.dir. It can be a list of comma separated paths. +mapred.local.dir=/mnt/lustre/hadoop-tmpdir + +#configured value of hadoop.log.dir. +hadoop.log.dir=/work/lustre/tasks/Hadoop.update/git/hadoop-common/logs + +#sleep time before sig kill is to be sent to process group after sigterm is sent. Should be in seconds +mapred.tasktracker.tasks.sleeptime-before-sigkill=30 + +#configured value of mapreduce.tasktracker.group. +mapreduce.tasktracker.group=0 diff --git a/src/mapred/org/apache/hadoop/mapred/ReduceTask.java b/src/mapred/org/apache/hadoop/mapred/ReduceTask.java index 980b988..076ad08 100644 --- a/src/mapred/org/apache/hadoop/mapred/ReduceTask.java +++ b/src/mapred/org/apache/hadoop/mapred/ReduceTask.java @@ -1481,10 +1481,10 @@ class ReduceTask extends Task { URLConnection connection = url.openConnection(); InputStream input = setupSecureConnection(mapOutputLoc, connection); - - // Validate header from map output + + // Validate header from map output TaskAttemptID mapId = null; - try { + try { mapId = TaskAttemptID.forName(connection.getHeaderField(FROM_MAP_TASK)); } catch (IllegalArgumentException ia) { @@ -1524,6 +1524,23 @@ class ReduceTask extends Task { LOG.debug("header: " + mapId + ", compressed len: " + compressedLength + ", decompressed len: " + decompressedLength); } + + + // Shuffle + MapOutput mapOutput = null; + + boolean shuffleLink = conf.getBoolean("mapreduce.shuffle.link", false); + + if (shuffleLink) { + if (LOG.isDebugEnabled()) { + LOG.debug("Shuffling " + decompressedLength + " bytes (" + + compressedLength + " raw bytes) " + + "to Link from " + mapOutputLoc.getTaskAttemptId()); + } + mapOutput = shuffleToLink(mapOutputLoc, input, filename, + (int) decompressedLength); + return mapOutput; + } //We will put a file in memory if it meets certain criteria: //1. The size of the (decompressed) file should be less than 25% of @@ -1533,8 +1550,6 @@ class ReduceTask extends Task { // Check if this map-output can be saved in-memory boolean shuffleInMemory = ramManager.canFitInMemory(decompressedLength); - // Shuffle - MapOutput mapOutput = null; if (shuffleInMemory) { if (LOG.isDebugEnabled()) { LOG.debug("Shuffling " + decompressedLength + " bytes (" + @@ -1772,6 +1787,48 @@ class ReduceTask extends Task { return mapOutput; } + private MapOutput shuffleToLink(MapOutputLocation mapOutputLoc, + InputStream input, + Path filename, + long mapOutputLength) + throws IOException { + // Find out a suitable location for the output on local-filesystem + Path localFilename = + lDirAlloc.getLocalPathForWrite(filename.toUri().getPath(), + mapOutputLength, conf); + + LOG.error("shuffleToDisk: local file = \"" + localFilename.toString() + "\""); + + MapOutput mapOutput = + new MapOutput(mapOutputLoc.getTaskId(), mapOutputLoc.getTaskAttemptId(), + conf, localFileSys.makeQualified(localFilename), + mapOutputLength); + + String query = mapOutputLoc.getOutputLocation().getQuery(); + LOG.error("shuffleToDisk: query = \"" + query + "\""); + + + String[] qureies=query.split("&"); + String maphost=mapOutputLoc.getHost(); + String tmpDir = conf.get("hadoop.tmp.dir"); + String lnCmd = conf.get("hadoop.ln.cmd"); + + String src=tmpDir+"/"+"/taskTracker/root/jobcache/" + +qureies[0].substring(qureies[0].indexOf('=')+1)+"/" + +qureies[1].substring(qureies[1].indexOf('=')+1)+"/output/file.out"; + + String command = lnCmd + " "+src+" "+localFilename; + try { + LOG.debug("shuffleToLink: Command used for hardlink "+command); + Runtime.getRuntime().exec(command).waitFor(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + return mapOutput; + } + private MapOutput shuffleToDisk(MapOutputLocation mapOutputLoc, InputStream input, Path filename, diff --git a/src/mapred/org/apache/hadoop/mapred/TaskRunner.java b/src/mapred/org/apache/hadoop/mapred/TaskRunner.java index ca54508..c9a9677 100644 --- a/src/mapred/org/apache/hadoop/mapred/TaskRunner.java +++ b/src/mapred/org/apache/hadoop/mapred/TaskRunner.java @@ -248,6 +248,7 @@ abstract class TaskRunner extends Thread { } setupCmds.add(setup); + LOG.warn("JVM to start: " + setupCmds.toString() + "; workdir = " + workDir); launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir); tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID()); if (exitCodeSet) { --------------080909000201000906000708--