Return-Path: X-Original-To: apmail-hadoop-common-user-archive@www.apache.org Delivered-To: apmail-hadoop-common-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2FF06B44E for ; Fri, 20 Jan 2012 20:19:29 +0000 (UTC) Received: (qmail 88554 invoked by uid 500); 20 Jan 2012 20:19:25 -0000 Delivered-To: apmail-hadoop-common-user-archive@hadoop.apache.org Received: (qmail 88495 invoked by uid 500); 20 Jan 2012 20:19:24 -0000 Mailing-List: contact common-user-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-user@hadoop.apache.org Delivered-To: mailing list common-user@hadoop.apache.org Received: (qmail 88487 invoked by uid 99); 20 Jan 2012 20:19:24 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Jan 2012 20:19:24 +0000 X-ASF-Spam-Status: No, hits=0.0 required=5.0 tests=MIME_QP_LONG_LINE,RCVD_IN_DNSWL_NONE,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of michael_segel@hotmail.com designates 65.55.111.158 as permitted sender) Received: from [65.55.111.158] (HELO blu0-omc4-s19.blu0.hotmail.com) (65.55.111.158) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Jan 2012 20:19:15 +0000 Received: from BLU0-SMTP117 ([65.55.111.135]) by blu0-omc4-s19.blu0.hotmail.com with Microsoft SMTPSVC(6.0.3790.4675); Fri, 20 Jan 2012 12:18:54 -0800 X-Originating-IP: [166.137.137.43] X-Originating-Email: [michael_segel@hotmail.com] Message-ID: Received: from [172.20.10.2] ([166.137.137.43]) by BLU0-SMTP117.phx.gbl over TLS secured channel with Microsoft SMTPSVC(6.0.3790.4675); Fri, 20 Jan 2012 12:18:52 -0800 References: In-Reply-To: MIME-Version: 1.0 (1.0) Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset="us-ascii" CC: common-user , Josh Patterson X-Mailer: iPad Mail (9A405) From: Michel Segel Subject: Re: Problems with timeout when a Hadoop job generates a large number of key-value pairs Date: Fri, 20 Jan 2012 14:18:47 -0600 To: "common-user@hadoop.apache.org" X-OriginalArrivalTime: 20 Jan 2012 20:18:52.0668 (UTC) FILETIME=[BA4C4BC0:01CCD7B0] Steve, If you want me to debug your code, I'll be glad to set up a billable contrac= t... ;-) What I am willing to do is to help you to debug your code... Did you time how long it takes in the Mapper.map() method? The reason I asked this is to first confirm that you are failing within a ma= p() method. It could be that you're just not updating your status... You said that you are writing many output records for a single input.=20 So let's take a look at your code. Are all writes of the same length? Meaning that in each iteration of Mapper.= map() you will always write. K number of rows? If so, ask yourself why some iterations are taking longer and longer? Note: I'm assuming that the time for each iteration is taking longer than th= e previous... Or am I missing something? -Mike Sent from a remote device. Please excuse any typos... Mike Segel On Jan 20, 2012, at 11:16 AM, Steve Lewis wrote: > We have been having problems with mappers timing out after 600 sec when th= e > mapper writes many more, say thousands of records for every > input record - even when the code in the mapper is small and fast. I > no idea what could cause the system to be so slow and am reluctant to rais= e > the 600 sec limit without understanding why there should be a timeout when= > all MY code is very fast. > P > I am enclosing a small sample which illustrates the problem. It will=20 > generate a 4GB text file on hdfs if the input file does not exist or is no= t > at least that size and this will take some time (hours in my configuration= ) > - then the code is essentially wordcount but instead of finding and > emitting words - the mapper emits all substrings of the input data - this > generates a much larger output data and number of output records than > wordcount generates. > Still, the amount of data emitted is no larger than other data sets I know= > Hadoop can handle. >=20 > All mappers on my 8 node cluster eventually timeout after 600 sec - even > though I see nothing in the code which is even a little slow and suspect > that any slow behavior is in the called Hadoop code. This is similar to a= > problem we have in bioinformatics where a colleague saw timeouts on his 5= 0 > node cluster. >=20 > I would appreciate any help from the group. Note - if you have a text file= > at least 4 GB the program will take that as an imput without trying to > create its own file. > /* > =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D > */ > import org.apache.hadoop.conf.*; > import org.apache.hadoop.fs.*; > import org.apache.hadoop.io.*; > import org.apache.hadoop.mapreduce.*; > import org.apache.hadoop.mapreduce.lib.input.*; > import org.apache.hadoop.mapreduce.lib.output.*; > import org.apache.hadoop.util.*; >=20 > import java.io.*; > import java.util.*; > /** > * org.systemsbiology.hadoop.SubstringGenerator > * > * This illustrates an issue we are having where a mapper generating a > much larger volume of > * data ans number of records times out even though the code is small, > simple and fast > * > * NOTE!!! as written the program will generate a 4GB file in hdfs with > good input data - > * this is done only if the file does not exist but may take several > hours. It will only be > * done once. After that the failure is fairly fast > * > * What this will do is count unique Substrings of lines of length > * between MIN_SUBSTRING_LENGTH and MAX_SUBSTRING_LENGTH by generatin all > * substrings and then using the word could algorithm > * What is interesting is that the number and volume or writes in the > * map phase is MUCH larger than the number of reads and the volume of > read data > * > * The example is artificial but similar the some real BioInformatics > problems - > * for example finding all substrings in a gemome can be important for > the design of > * microarrays. > * > * While the real problem is more complex - the problem is that > * when the input file is large enough the mappers time out failing to > report after > * 600 sec. There is nothing slow in any of the application code and > nothing I > */ > public class SubstringCount implements Tool { >=20 >=20 > public static final long ONE_MEG =3D 1024 * 1024; > public static final long ONE_GIG =3D 1024 * ONE_MEG; > public static final int LINE_LENGTH =3D 100; > public static final Random RND =3D new Random(); >=20 > // NOTE - edit this line to be a sensible location in the current file > system > public static final String INPUT_FILE_PATH =3D "BigInputLines.txt"; > // NOTE - edit this line to be a sensible location in the current file > system > public static final String OUTPUT_FILE_PATH =3D "output"; > // NOTE - edit this line to be the input file size - 4 * ONE_GIG > should be large but not a problem > public static final long DESIRED_LENGTH =3D 4 * ONE_GIG; > // NOTE - limits on substring length > public static final int MINIMUM_LENGTH =3D 5; > public static final int MAXIMUM_LENGTH =3D 32; >=20 >=20 > /** > * create an input file to read > * @param fs !null file system > * @param p !null path > * @throws IOException om error > */ > public static void guaranteeInputFile(FileSystem fs, Path p) throws > IOException { > if (fs.isFile(p)) { > FileStatus fileStatus =3D fs.getFileStatus(p); > if (fileStatus.getLen() >=3D DESIRED_LENGTH) > return; > } > FSDataOutputStream open =3D fs.create(p); > PrintStream ps =3D new PrintStream(open); > long showInterval =3D DESIRED_LENGTH / 100; > for (long i =3D 0; i < DESIRED_LENGTH; i +=3D LINE_LENGTH) { > writeRandomLine(ps, LINE_LENGTH); > // show progress > if(i % showInterval =3D=3D 0) { > System.err.print("."); >=20 > } > } > System.err.println(""); > ps.close(); > } >=20 > /** > * write a line with a random string of capital letters > * > * @param pPs - output > * @param pLineLength length of the line > */ > public static void writeRandomLine(final PrintStream pPs, final int > pLineLength) { > StringBuilder sb =3D new StringBuilder(); > for (int i =3D 0; i < pLineLength; i++) { > char c =3D (char) ('A' + RND.nextInt(26)); > sb.append(c); >=20 > } > String s =3D sb.toString(); > pPs.println(s); > } >=20 >=20 >=20 > /** > * Construct a Configured. > */ > public SubstringCount() { > } >=20 >=20 >=20 > /** > * similar to the Word Count mapper but one line generates a lot more > output > */ > public static class SubStringMapper > extends Mapper { >=20 > /** > * generate a array of substrings > * > * @param inp input long string > * @param minLength minimum substring length > * @param maxLength maximum substring length > * @return !null array of strings > */ > public static String[] generateSubStrings(String inp, int > minLength, int maxLength) { > List holder =3D new ArrayList(); > for (int start =3D 0; start < inp.length() - minLength; start++= ) { > for (int end =3D start + minLength; end < > Math.min(inp.length(), start + maxLength); end++) { > try { > holder.add(inp.substring(start, end)); > } > catch (Exception e) { > throw new RuntimeException(e); >=20 > } > } > } >=20 > String[] ret =3D new String[holder.size()]; > holder.toArray(ret); > return ret; > } >=20 > private final IntWritable one =3D new IntWritable(1); > private final Text word =3D new Text(); >=20 > /** > * Like word count except the words are all substrings of the input= > data > * This leads to a large increase > * @param key irrelevant > * @param value one read line > * @param context !null context > */ > public void map(Object key, Text value, Context context > ) throws IOException, InterruptedException { > String inp =3D value.toString(); > // The written data is hundreds of times the input data > String[] data =3D generateSubStrings(inp, MINIMUM_LENGTH, > MAXIMUM_LENGTH); > for (int i =3D 0; i < data.length; i++) { > String s =3D data[i]; > word.set(s); > context.write(word, one); > } > } > } >=20 > /** > * Essentially the same reducer used by word count > */ > public static class IntSumReducer > extends Reducer { > private final IntWritable result =3D new IntWritable(); >=20 > public void reduce(Text key, Iterable values, > Context context > ) throws IOException, InterruptedException { > int sum =3D 0; > for (IntWritable val : values) { > sum +=3D val.get(); > } > result.set(sum); > context.write(key, result); > } > } >=20 >=20 > /** > * kill a directory and all contents > * useful to make sure the output directory is empty > * @param src !null path of a directory > * @param fs !null file system > * @return true on success > */ > public static boolean expunge(Path src, FileSystem fs) { > try { > if (!fs.exists(src)) > return true; > // break these out > if (fs.getFileStatus(src).isDir()) { > boolean doneOK =3D fs.delete(src, true); > doneOK =3D !fs.exists(src); > return doneOK; > } > if (fs.isFile(src)) { > boolean doneOK =3D fs.delete(src, false); > return doneOK; > } > throw new IllegalStateException("should be file of directory if= > it exists"); > } > catch (IOException e) { > throw new RuntimeException(e); > } >=20 > } >=20 > /** > * this implementation of run allows the program to start with a > Configuration which may > * have been filled by other code > * @param conf !null conf > * @param args !null arguments > * @return 0 on success > * @throws Exception on error > */ > public int runJob(Configuration conf, String[] args) throws Exception {= > String[] realArgs =3D new String[2]; >=20 > String[] otherArgs =3D new GenericOptionsParser(conf, > args).getRemainingArgs(); >=20 > if(otherArgs.length > 0) > realArgs[0] =3D otherArgs[0]; > else > realArgs[0] =3D INPUT_FILE_PATH; > if(otherArgs.length > 1) > realArgs[1] =3D otherArgs[1]; > else > realArgs[1] =3D OUTPUT_FILE_PATH; >=20 > Path InputPath =3D new Path(realArgs[0]); > Path outputDir =3D new Path(realArgs[1]); >=20 > Job job =3D new Job(conf, "Substring Generator"); > conf =3D job.getConfiguration(); // NOTE JOB Copies the configurato= n > job.setJarByClass(SubstringCount.class); > job.setMapperClass(SubStringMapper.class); > job.setCombinerClass(IntSumReducer.class); > job.setReducerClass(IntSumReducer.class); >=20 > job.setMapOutputKeyClass(Text.class); > job.setMapOutputValueClass(IntWritable.class); > job.setOutputKeyClass(Text.class); > job.setOutputValueClass(IntWritable.class); >=20 > job.setNumReduceTasks(72); // todo size for your cluster > FileInputFormat.addInputPath(job,InputPath); >=20 >=20 > FileSystem fileSystem =3D outputDir.getFileSystem(conf); > // make sure the output directory is empty > expunge(outputDir, fileSystem); // make sure thia does not exist= > FileOutputFormat.setOutputPath(job, outputDir); >=20 > // Now create something for the job to read > guaranteeInputFile(fileSystem, InputPath); >=20 >=20 > boolean ans =3D job.waitForCompletion(true); > int ret =3D ans ? 0 : 1; > return ret; > } >=20 >=20 > /** > * Execute the command with the given arguments. > * > * @param args command specific arguments. > * @return exit code. > * @throws Exception > */ > @Override > public int run(final String[] args) throws Exception { > Configuration conf =3D new Configuration(); > return runJob(conf, args); > } >=20 > /** > * > * @param args args[0] is the path to a file to be created in the > FileSystem > * args[1] is the path to an output directory in the file system - the > contents WILL be deleted > * @throws Exception on error > */ > public static void main(String[] args) throws Exception { > ToolRunner.run(new SubstringCount(), args); > } > } >=20 > --=20 > Steven M. Lewis PhD > 4221 105th Ave NE > Kirkland, WA 98033 > 206-384-1340 (cell) > Skype lordjoe_com