hadoop-common-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michel Segel <michael_se...@hotmail.com>
Subject Re: Problems with timeout when a Hadoop job generates a large number of key-value pairs
Date Fri, 20 Jan 2012 20:18:47 GMT
Steve,
If you want me to debug your code, I'll be glad to set up a billable contract... ;-)

What I am willing to do is to help you to debug your code...

Did you time how long it takes in the Mapper.map() method?
The reason I asked this is to first confirm that you are failing within a map() method.
It could be that you're just not updating your status...

You said that you are writing many output records for a single input. 

So let's take a look at your code.
Are all writes of the same length? Meaning that in each iteration of Mapper.map() you will
always write. K number of rows?

If so, ask yourself why some iterations are taking longer and longer?

Note: I'm assuming that the time for each iteration is taking longer than the previous...

Or am I missing something?

-Mike

Sent from a remote device. Please excuse any typos...

Mike Segel

On Jan 20, 2012, at 11:16 AM, Steve Lewis <lordjoe2000@gmail.com> wrote:

> We have been having problems with mappers timing out after 600 sec when the
> mapper writes many more, say thousands of records for every
> input record - even when the code in the mapper is small and fast. I
> no idea what could cause the system to be so slow and am reluctant to raise
> the 600 sec limit without understanding why there should be a timeout when
> all MY code is very fast.
> P
> I am enclosing a small sample which illustrates the problem. It will 
> generate a 4GB text file on hdfs if the input file does not exist or is not
> at least that size and this will take some time (hours in my configuration)
> - then the code is essentially wordcount but instead of finding and
> emitting words - the mapper emits all substrings of the input data - this
> generates a much larger output data and number of output records than
> wordcount generates.
> Still, the amount of data emitted is no larger than other data sets I know
> Hadoop can handle.
> 
> All mappers on my 8 node cluster eventually timeout after 600 sec - even
> though I see nothing in the code which is even a little slow and suspect
> that any slow behavior is in the  called Hadoop code. This is similar to a
> problem we have in bioinformatics where a  colleague saw timeouts on his 50
> node cluster.
> 
> I would appreciate any help from the group. Note - if you have a text file
> at least 4 GB the program will take that as an imput without trying to
> create its own file.
> /*
> ============================================================================================
> */
> import org.apache.hadoop.conf.*;
> import org.apache.hadoop.fs.*;
> import org.apache.hadoop.io.*;
> import org.apache.hadoop.mapreduce.*;
> import org.apache.hadoop.mapreduce.lib.input.*;
> import org.apache.hadoop.mapreduce.lib.output.*;
> import org.apache.hadoop.util.*;
> 
> import java.io.*;
> import java.util.*;
> /**
> * org.systemsbiology.hadoop.SubstringGenerator
>  *
>  * This illustrates an issue we are having where a mapper generating a
> much larger volume of
>  * data ans number of records times out even though the code is small,
> simple and fast
>  *
>  * NOTE!!! as written the program will generate a 4GB file in hdfs with
> good input data -
>  * this is done only if the file does not exist but may take several
> hours. It will only be
>  * done once. After that the failure is fairly fast
>  *
> * What this will do is count  unique Substrings of lines of length
> * between MIN_SUBSTRING_LENGTH and MAX_SUBSTRING_LENGTH by generatin all
> * substrings and  then using the word could algorithm
> * What is interesting is that the number and volume or writes in the
>  * map phase is MUCH larger than the number of reads and the volume of
> read data
>  *
>  * The example is artificial but similar the some real BioInformatics
> problems -
>  *  for example finding all substrings in a gemome can be important for
> the design of
>  *  microarrays.
>  *
>  *  While the real problem is more complex - the problem is that
>  *  when the input file is large enough the mappers time out failing to
> report after
>  *  600 sec. There is nothing slow in any of the application code and
> nothing I
> */
> public class SubstringCount  implements Tool   {
> 
> 
>    public static final long ONE_MEG = 1024 * 1024;
>    public static final long ONE_GIG = 1024 * ONE_MEG;
>    public static final int LINE_LENGTH = 100;
>    public static final Random RND = new Random();
> 
>   // NOTE - edit this line to be a sensible location in the current file
> system
>    public static final String INPUT_FILE_PATH = "BigInputLines.txt";
>   // NOTE - edit this line to be a sensible location in the current file
> system
>    public static final String OUTPUT_FILE_PATH = "output";
>     // NOTE - edit this line to be the input file size - 4 * ONE_GIG
> should be large but not a problem
>    public static final long DESIRED_LENGTH = 4 * ONE_GIG;
>    // NOTE - limits on substring length
>    public static final int MINIMUM_LENGTH = 5;
>    public static final int MAXIMUM_LENGTH = 32;
> 
> 
>    /**
>     * create an input file to read
>     * @param fs !null file system
>     * @param p  !null path
>     * @throws IOException om error
>     */
>    public static void guaranteeInputFile(FileSystem fs, Path p) throws
> IOException {
>        if (fs.isFile(p)) {
>            FileStatus fileStatus = fs.getFileStatus(p);
>            if (fileStatus.getLen() >= DESIRED_LENGTH)
>                return;
>        }
>        FSDataOutputStream open = fs.create(p);
>        PrintStream ps = new PrintStream(open);
>         long showInterval = DESIRED_LENGTH  / 100;
>        for (long i = 0; i < DESIRED_LENGTH; i += LINE_LENGTH) {
>            writeRandomLine(ps, LINE_LENGTH);
>            // show progress
>            if(i % showInterval == 0)  {
>                System.err.print(".");
> 
>            }
>        }
>        System.err.println("");
>        ps.close();
>    }
> 
>    /**
>     * write a line with a random string of capital letters
>     *
>     * @param pPs         -  output
>     * @param pLineLength length of the line
>     */
>    public static void writeRandomLine(final PrintStream pPs, final int
> pLineLength) {
>        StringBuilder sb = new StringBuilder();
>        for (int i = 0; i < pLineLength; i++) {
>            char c = (char) ('A' + RND.nextInt(26));
>            sb.append(c);
> 
>        }
>        String s = sb.toString();
>        pPs.println(s);
>    }
> 
> 
> 
>    /**
>     * Construct a Configured.
>     */
>    public SubstringCount() {
>    }
> 
> 
> 
>    /**
>     * similar to the Word Count mapper but one line generates a lot more
> output
>     */
>    public static class SubStringMapper
>            extends Mapper<Object, Text, Text, IntWritable> {
> 
>        /**
>         * generate a array of substrings
>         *
>         * @param inp       input long string
>         * @param minLength minimum substring length
>         * @param maxLength maximum substring length
>         * @return !null array of strings
>         */
>        public static String[] generateSubStrings(String inp, int
> minLength, int maxLength) {
>            List<String> holder = new ArrayList<String>();
>            for (int start = 0; start < inp.length() - minLength; start++) {
>                for (int end = start + minLength; end <
> Math.min(inp.length(), start + maxLength); end++) {
>                    try {
>                        holder.add(inp.substring(start, end));
>                    }
>                    catch (Exception e) {
>                        throw new RuntimeException(e);
> 
>                    }
>                }
>            }
> 
>            String[] ret = new String[holder.size()];
>            holder.toArray(ret);
>             return ret;
>        }
> 
>        private final IntWritable one = new IntWritable(1);
>        private final Text word = new Text();
> 
>        /**
>         * Like word count except the words are all substrings of the input
> data
>         * This leads to a large increase
>         * @param key  irrelevant
>         * @param value  one read line
>         * @param context  !null context
>            */
>        public void map(Object key, Text value, Context context
>        ) throws IOException, InterruptedException {
>            String inp = value.toString();
>            // The written data is hundreds of times the input data
>            String[] data = generateSubStrings(inp, MINIMUM_LENGTH,
> MAXIMUM_LENGTH);
>            for (int i = 0; i < data.length; i++) {
>                String s = data[i];
>                word.set(s);
>                context.write(word, one);
>            }
>        }
>    }
> 
>    /**
>     * Essentially the same reducer used by word count
>     */
>    public static class IntSumReducer
>            extends Reducer<Text, IntWritable, Text, IntWritable> {
>        private final IntWritable result = new IntWritable();
> 
>        public void reduce(Text key, Iterable<IntWritable> values,
>                           Context context
>        ) throws IOException, InterruptedException {
>            int sum = 0;
>            for (IntWritable val : values) {
>                sum += val.get();
>            }
>            result.set(sum);
>            context.write(key, result);
>        }
>    }
> 
> 
>    /**
>     * kill a directory and all contents
>     * useful to make sure the output directory is empty
>     * @param src !null path of a directory
>     * @param fs !null file system
>     * @return  true on success
>     */
>    public static boolean expunge(Path src, FileSystem fs) {
>        try {
>            if (!fs.exists(src))
>                return true;
>            // break these out
>            if (fs.getFileStatus(src).isDir()) {
>                boolean doneOK = fs.delete(src, true);
>                doneOK = !fs.exists(src);
>                return doneOK;
>            }
>            if (fs.isFile(src)) {
>                boolean doneOK = fs.delete(src, false);
>                return doneOK;
>            }
>            throw new IllegalStateException("should be file of directory if
> it exists");
>        }
>        catch (IOException e) {
>            throw new RuntimeException(e);
>        }
> 
>    }
> 
>     /**
>      * this implementation of run allows the program to start with a
> Configuration which may
>      * have been filled by other code
>      * @param conf !null conf
>      * @param args  !null arguments
>      * @return 0 on success
>      * @throws Exception  on error
>      */
>    public int runJob(Configuration conf, String[] args) throws Exception {
>        String[] realArgs = new String[2];
> 
>        String[] otherArgs = new GenericOptionsParser(conf,
> args).getRemainingArgs();
> 
>        if(otherArgs.length > 0)
>             realArgs[0] = otherArgs[0];
>         else
>             realArgs[0] = INPUT_FILE_PATH;
>        if(otherArgs.length > 1)
>             realArgs[1] = otherArgs[1];
>         else
>             realArgs[1] = OUTPUT_FILE_PATH;
> 
>        Path InputPath = new Path(realArgs[0]);
>        Path outputDir =  new Path(realArgs[1]);
> 
>         Job job = new Job(conf, "Substring Generator");
>        conf = job.getConfiguration(); // NOTE JOB Copies the configuraton
>        job.setJarByClass(SubstringCount.class);
>        job.setMapperClass(SubStringMapper.class);
>        job.setCombinerClass(IntSumReducer.class);
>        job.setReducerClass(IntSumReducer.class);
> 
>        job.setMapOutputKeyClass(Text.class);
>        job.setMapOutputValueClass(IntWritable.class);
>        job.setOutputKeyClass(Text.class);
>        job.setOutputValueClass(IntWritable.class);
> 
>         job.setNumReduceTasks(72); // todo size for your cluster
>        FileInputFormat.addInputPath(job,InputPath);
> 
> 
>        FileSystem fileSystem = outputDir.getFileSystem(conf);
> // make sure the output directory is empty
>        expunge(outputDir, fileSystem);    // make sure thia does not exist
>        FileOutputFormat.setOutputPath(job, outputDir);
> 
>        // Now create something for the job to read
>        guaranteeInputFile(fileSystem, InputPath);
> 
> 
>        boolean ans = job.waitForCompletion(true);
>        int ret = ans ? 0 : 1;
>        return ret;
>    }
> 
> 
>    /**
>     * Execute the command with the given arguments.
>     *
>     * @param args command specific arguments.
>     * @return exit code.
>     * @throws Exception
>     */
>    @Override
>    public int run(final String[] args) throws Exception {
>        Configuration conf = new Configuration();
>        return runJob(conf, args);
>    }
> 
>    /**
>     *
>     * @param args  args[0] is the path to a file to be created in the
> FileSystem
>     * args[1] is the path to an output directory in the file system - the
> contents WILL be deleted
>     * @throws Exception  on error
>     */
>    public static void main(String[] args) throws Exception {
>        ToolRunner.run(new SubstringCount(), args);
>    }
> }
> 
> -- 
> Steven M. Lewis PhD
> 4221 105th Ave NE
> Kirkland, WA 98033
> 206-384-1340 (cell)
> Skype lordjoe_com

Mime
View raw message