hadoop-common-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Steve Lewis <lordjoe2...@gmail.com>
Subject Re: Problems with timeout when a Hadoop job generates a large number of key-value pairs
Date Fri, 20 Jan 2012 22:01:52 GMT
One thing I can say for sure is that  generateSubStrings() is not slow -
Every input line in my sample is 100 characters and the timing should be
very similar
from one run to the next.

This sample is a simplification of a more complex real problem where we see
timeouts when a
map generates significantly more records than it reads - thousands for each
input. I see nothing to
cause me to suppose that my code is slow and I strongly suspect the problem
is internal to hadoop.

On Fri, Jan 20, 2012 at 1:47 PM, Vinod Kumar Vavilapalli <
vinodkv@hortonworks.com> wrote:

> Every so often, you should do a context.progress() so that the
> framework knows that this map is doing useful work. That will prevent
> the framework from killing it after 10 mins. The framework
> automatically does this every time you do a
> context.write()/context.setStatus(), but if the map is stuck for 10
> mins during processing some keys (may be generateSubStrings()), that
> may lead to a timeout.
>
> HTH,
> +Vinod
>
>
> On Fri, Jan 20, 2012 at 9:16 AM, Steve Lewis <lordjoe2000@gmail.com>
> wrote:
> > We have been having problems with mappers timing out after 600 sec when
> the
> > mapper writes many more, say thousands of records for every
> > input record - even when the code in the mapper is small and fast. I have
> > no idea what could cause the system to be so slow and am reluctant to
> raise
> > the 600 sec limit without understanding why there should be a timeout
> when
> > all MY code is very fast.
> >
> > I am enclosing a small sample which illustrates the problem. It will
> > generate a 4GB text file on hdfs if the input file does not exist or is
> not
> > at least that size and this will take some time (hours in my
> configuration)
> > - then the code is essentially wordcount but instead of finding and
> > emitting words - the mapper emits all substrings of the input data - this
> > generates a much larger output data and number of output records than
> > wordcount generates.
> > Still, the amount of data emitted is no larger than other data sets I
> know
> > Hadoop can handle.
> >
> > All mappers on my 8 node cluster eventually timeout after 600 sec - even
> > though I see nothing in the code which is even a little slow and suspect
> > that any slow behavior is in the  called Hadoop code. This is similar to
> a
> > problem we have in bioinformatics where a  colleague saw timeouts on his
> 50
> > node cluster.
> >
> > I would appreciate any help from the group. Note - if you have a text
> file
> > at least 4 GB the program will take that as an imput without trying to
> > create its own file.
> > /*
> >
> ============================================================================================
> > */
> > import org.apache.hadoop.conf.*;
> > import org.apache.hadoop.fs.*;
> > import org.apache.hadoop.io.*;
> > import org.apache.hadoop.mapreduce.*;
> > import org.apache.hadoop.mapreduce.lib.input.*;
> > import org.apache.hadoop.mapreduce.lib.output.*;
> > import org.apache.hadoop.util.*;
> >
> > import java.io.*;
> > import java.util.*;
> >  /**
> >  * org.systemsbiology.hadoop.SubstringGenerator
> >  *
> >  * This illustrates an issue we are having where a mapper generating a
> > much larger volume of
> >  * data ans number of records times out even though the code is small,
> > simple and fast
> >  *
> >  * NOTE!!! as written the program will generate a 4GB file in hdfs with
> > good input data -
> >  * this is done only if the file does not exist but may take several
> > hours. It will only be
> >  * done once. After that the failure is fairly fast
> >  *
> >  * What this will do is count  unique Substrings of lines of length
> >  * between MIN_SUBSTRING_LENGTH and MAX_SUBSTRING_LENGTH by generatin all
> >  * substrings and  then using the word could algorithm
> >  * What is interesting is that the number and volume or writes in the
> >  * map phase is MUCH larger than the number of reads and the volume of
> > read data
> >  *
> >  * The example is artificial but similar the some real BioInformatics
> > problems -
> >  *  for example finding all substrings in a gemome can be important for
> > the design of
> >  *  microarrays.
> >  *
> >  *  While the real problem is more complex - the problem is that
> >  *  when the input file is large enough the mappers time out failing to
> > report after
> >  *  600 sec. There is nothing slow in any of the application code and
> > nothing I
> >  */
> > public class SubstringCount  implements Tool   {
> >
> >
> >    public static final long ONE_MEG = 1024 * 1024;
> >    public static final long ONE_GIG = 1024 * ONE_MEG;
> >    public static final int LINE_LENGTH = 100;
> >    public static final Random RND = new Random();
> >
> >   // NOTE - edit this line to be a sensible location in the current file
> > system
> >    public static final String INPUT_FILE_PATH = "BigInputLines.txt";
> >   // NOTE - edit this line to be a sensible location in the current file
> > system
> >    public static final String OUTPUT_FILE_PATH = "output";
> >     // NOTE - edit this line to be the input file size - 4 * ONE_GIG
> > should be large but not a problem
> >    public static final long DESIRED_LENGTH = 4 * ONE_GIG;
> >    // NOTE - limits on substring length
> >    public static final int MINIMUM_LENGTH = 5;
> >    public static final int MAXIMUM_LENGTH = 32;
> >
> >
> >    /**
> >     * create an input file to read
> >     * @param fs !null file system
> >     * @param p  !null path
> >     * @throws IOException om error
> >     */
> >    public static void guaranteeInputFile(FileSystem fs, Path p) throws
> > IOException {
> >        if (fs.isFile(p)) {
> >            FileStatus fileStatus = fs.getFileStatus(p);
> >            if (fileStatus.getLen() >= DESIRED_LENGTH)
> >                return;
> >        }
> >        FSDataOutputStream open = fs.create(p);
> >        PrintStream ps = new PrintStream(open);
> >         long showInterval = DESIRED_LENGTH  / 100;
> >        for (long i = 0; i < DESIRED_LENGTH; i += LINE_LENGTH) {
> >            writeRandomLine(ps, LINE_LENGTH);
> >            // show progress
> >            if(i % showInterval == 0)  {
> >                System.err.print(".");
> >
> >            }
> >        }
> >        System.err.println("");
> >        ps.close();
> >    }
> >
> >    /**
> >     * write a line with a random string of capital letters
> >     *
> >     * @param pPs         -  output
> >     * @param pLineLength length of the line
> >     */
> >    public static void writeRandomLine(final PrintStream pPs, final int
> > pLineLength) {
> >        StringBuilder sb = new StringBuilder();
> >        for (int i = 0; i < pLineLength; i++) {
> >            char c = (char) ('A' + RND.nextInt(26));
> >            sb.append(c);
> >
> >        }
> >        String s = sb.toString();
> >        pPs.println(s);
> >    }
> >
> >
> >
> >    /**
> >     * Construct a Configured.
> >     */
> >    public SubstringCount() {
> >    }
> >
> >
> >
> >    /**
> >     * similar to the Word Count mapper but one line generates a lot more
> > output
> >     */
> >    public static class SubStringMapper
> >            extends Mapper<Object, Text, Text, IntWritable> {
> >
> >        /**
> >         * generate a array of substrings
> >         *
> >         * @param inp       input long string
> >         * @param minLength minimum substring length
> >         * @param maxLength maximum substring length
> >         * @return !null array of strings
> >         */
> >        public static String[] generateSubStrings(String inp, int
> > minLength, int maxLength) {
> >            List<String> holder = new ArrayList<String>();
> >            for (int start = 0; start < inp.length() - minLength;
> start++) {
> >                for (int end = start + minLength; end <
> > Math.min(inp.length(), start + maxLength); end++) {
> >                    try {
> >                        holder.add(inp.substring(start, end));
> >                    }
> >                    catch (Exception e) {
> >                        throw new RuntimeException(e);
> >
> >                    }
> >                }
> >            }
> >
> >            String[] ret = new String[holder.size()];
> >            holder.toArray(ret);
> >             return ret;
> >        }
> >
> >        private final IntWritable one = new IntWritable(1);
> >        private final Text word = new Text();
> >
> >        /**
> >         * Like word count except the words are all substrings of the
> input
> > data
> >         * This leads to a large increase
> >         * @param key  irrelevant
> >         * @param value  one read line
> >         * @param context  !null context
> >            */
> >        public void map(Object key, Text value, Context context
> >        ) throws IOException, InterruptedException {
> >            String inp = value.toString();
> >            // The written data is hundreds of times the input data
> >            String[] data = generateSubStrings(inp, MINIMUM_LENGTH,
> > MAXIMUM_LENGTH);
> >            for (int i = 0; i < data.length; i++) {
> >                String s = data[i];
> >                word.set(s);
> >                context.write(word, one);
> >            }
> >        }
> >    }
> >
> >    /**
> >     * Essentially the same reducer used by word count
> >     */
> >    public static class IntSumReducer
> >            extends Reducer<Text, IntWritable, Text, IntWritable> {
> >        private final IntWritable result = new IntWritable();
> >
> >        public void reduce(Text key, Iterable<IntWritable> values,
> >                           Context context
> >        ) throws IOException, InterruptedException {
> >            int sum = 0;
> >            for (IntWritable val : values) {
> >                sum += val.get();
> >            }
> >            result.set(sum);
> >            context.write(key, result);
> >        }
> >    }
> >
> >
> >    /**
> >     * kill a directory and all contents
> >     * useful to make sure the output directory is empty
> >     * @param src !null path of a directory
> >     * @param fs !null file system
> >     * @return  true on success
> >     */
> >    public static boolean expunge(Path src, FileSystem fs) {
> >        try {
> >            if (!fs.exists(src))
> >                return true;
> >            // break these out
> >            if (fs.getFileStatus(src).isDir()) {
> >                boolean doneOK = fs.delete(src, true);
> >                doneOK = !fs.exists(src);
> >                return doneOK;
> >            }
> >            if (fs.isFile(src)) {
> >                boolean doneOK = fs.delete(src, false);
> >                return doneOK;
> >            }
> >            throw new IllegalStateException("should be file of directory
> if
> > it exists");
> >        }
> >        catch (IOException e) {
> >            throw new RuntimeException(e);
> >        }
> >
> >    }
> >
> >     /**
> >      * this implementation of run allows the program to start with a
> > Configuration which may
> >      * have been filled by other code
> >      * @param conf !null conf
> >      * @param args  !null arguments
> >      * @return 0 on success
> >      * @throws Exception  on error
> >      */
> >    public int runJob(Configuration conf, String[] args) throws Exception
> {
> >        String[] realArgs = new String[2];
> >
> >        String[] otherArgs = new GenericOptionsParser(conf,
> > args).getRemainingArgs();
> >
> >        if(otherArgs.length > 0)
> >             realArgs[0] = otherArgs[0];
> >         else
> >             realArgs[0] = INPUT_FILE_PATH;
> >        if(otherArgs.length > 1)
> >             realArgs[1] = otherArgs[1];
> >         else
> >             realArgs[1] = OUTPUT_FILE_PATH;
> >
> >        Path InputPath = new Path(realArgs[0]);
> >        Path outputDir =  new Path(realArgs[1]);
> >
> >         Job job = new Job(conf, "Substring Generator");
> >        conf = job.getConfiguration(); // NOTE JOB Copies the configuraton
> >        job.setJarByClass(SubstringCount.class);
> >        job.setMapperClass(SubStringMapper.class);
> >        job.setCombinerClass(IntSumReducer.class);
> >        job.setReducerClass(IntSumReducer.class);
> >
> >        job.setMapOutputKeyClass(Text.class);
> >        job.setMapOutputValueClass(IntWritable.class);
> >        job.setOutputKeyClass(Text.class);
> >        job.setOutputValueClass(IntWritable.class);
> >
> >         job.setNumReduceTasks(72); // todo size for your cluster
> >        FileInputFormat.addInputPath(job,InputPath);
> >
> >
> >        FileSystem fileSystem = outputDir.getFileSystem(conf);
> > // make sure the output directory is empty
> >        expunge(outputDir, fileSystem);    // make sure thia does not
> exist
> >        FileOutputFormat.setOutputPath(job, outputDir);
> >
> >        // Now create something for the job to read
> >        guaranteeInputFile(fileSystem, InputPath);
> >
> >
> >        boolean ans = job.waitForCompletion(true);
> >        int ret = ans ? 0 : 1;
> >        return ret;
> >    }
> >
> >
> >    /**
> >     * Execute the command with the given arguments.
> >     *
> >     * @param args command specific arguments.
> >     * @return exit code.
> >     * @throws Exception
> >     */
> >    @Override
> >    public int run(final String[] args) throws Exception {
> >        Configuration conf = new Configuration();
> >        return runJob(conf, args);
> >    }
> >
> >    /**
> >     *
> >     * @param args  args[0] is the path to a file to be created in the
> > FileSystem
> >     * args[1] is the path to an output directory in the file system - the
> > contents WILL be deleted
> >     * @throws Exception  on error
> >     */
> >    public static void main(String[] args) throws Exception {
> >        ToolRunner.run(new SubstringCount(), args);
> >    }
> > }
> >
> > --
> > Steven M. Lewis PhD
> > 4221 105th Ave NE
> > Kirkland, WA 98033
> > 206-384-1340 (cell)
> > Skype lordjoe_com
>



-- 
Steven M. Lewis PhD
4221 105th Ave NE
Kirkland, WA 98033
206-384-1340 (cell)
Skype lordjoe_com

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message