hadoop-common-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Steve Lewis <lordjoe2...@gmail.com>
Subject Re: Problems with timeout when a Hadoop job generates a large number of key-value pairs
Date Fri, 20 Jan 2012 22:16:12 GMT
Good catch on the Configured - In my tests is extends my subclass of
Configured but a I took out any
dependencies on my environment.

Interesting - I strongly suspect a disk IO or network problem since my code
is very simple and very fast.
If you  add lines to  generateSubStrings to limit String length to 100
characters (I think it is always that but this makes sure) then nothing in
my code should be slow - substring generation is very fast at that size.

I also suspect that the problem in disk i/o may occur in a manner dependent
on the size and storage available on the nodes - What I find looking at the
Hadoop job tracker UI is that the mappers complete the first 25% of the
work relatively rapidly and the timeout occurs in my hands in the last half
of the map - we are having issues with a 10 GB file and, if it is not too
much trouble you might try a larger value for  DESIRED_LENGTH   - say 20 GB
-
Also how big is your cluster and how long do the mappers take?

public static String[] generateSubStrings(String inp, int minLength, int
maxLength) {
           // guarantee no more than 100 characters
            if(inp.length() > 100)
                      inp = inp.substring(0,100);
            List<String> holder = new ArrayList<String>();
            for (int start = 0; start < inp.length() - minLength; start++) {
                for (int end = start + minLength; end <
Math.min(inp.length(), start + maxLength); end++) {
                    try {
                        holder.add(inp.substring(start, end));
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);

                    }
                }
            }

On Fri, Jan 20, 2012 at 12:41 PM, Alex Kozlov <alexvk@cloudera.com> wrote:

> Hi Steve, I ran your job on our cluster and it does not timeout.  I noticed
> that each mapper runs for a long time: one way to avoid a timeout is to
> update a user counter.  As long as this counter is updated within 10
> minutes, the task should not timeout (as MR knows that something is being
> done).  Normally an output bytes counter would be updated, but if the job
> is stuck somewhere doing something it will timeout.  I agree that there
> might be a disk IO or network problem that causes a long wait, but without
> detailed logs it's hard to tell.
>
> On the side note the SubstringCount class should extend Configured.
>
> --
> Alex K
> <http://www.cloudera.com/company/press-center/hadoop-world-nyc/>
>
> On Fri, Jan 20, 2012 at 12:18 PM, Michel Segel <michael_segel@hotmail.com
> >wrote:
>
> > Steve,
> > If you want me to debug your code, I'll be glad to set up a billable
> > contract... ;-)
> >
> > What I am willing to do is to help you to debug your code...
> >
> > Did you time how long it takes in the Mapper.map() method?
> > The reason I asked this is to first confirm that you are failing within a
> > map() method.
> > It could be that you're just not updating your status...
> >
> > You said that you are writing many output records for a single input.
> >
> > So let's take a look at your code.
> > Are all writes of the same length? Meaning that in each iteration of
> > Mapper.map() you will always write. K number of rows?
> >
> > If so, ask yourself why some iterations are taking longer and longer?
> >
> > Note: I'm assuming that the time for each iteration is taking longer than
> > the previous...
> >
> > Or am I missing something?
> >
> > -Mike
> >
> > Sent from a remote device. Please excuse any typos...
> >
> > Mike Segel
> >
> > On Jan 20, 2012, at 11:16 AM, Steve Lewis <lordjoe2000@gmail.com> wrote:
> >
> > > We have been having problems with mappers timing out after 600 sec when
> > the
> > > mapper writes many more, say thousands of records for every
> > > input record - even when the code in the mapper is small and fast. I
> > > no idea what could cause the system to be so slow and am reluctant to
> > raise
> > > the 600 sec limit without understanding why there should be a timeout
> > when
> > > all MY code is very fast.
> > > P
> > > I am enclosing a small sample which illustrates the problem. It will
> > > generate a 4GB text file on hdfs if the input file does not exist or is
> > not
> > > at least that size and this will take some time (hours in my
> > configuration)
> > > - then the code is essentially wordcount but instead of finding and
> > > emitting words - the mapper emits all substrings of the input data -
> this
> > > generates a much larger output data and number of output records than
> > > wordcount generates.
> > > Still, the amount of data emitted is no larger than other data sets I
> > know
> > > Hadoop can handle.
> > >
> > > All mappers on my 8 node cluster eventually timeout after 600 sec -
> even
> > > though I see nothing in the code which is even a little slow and
> suspect
> > > that any slow behavior is in the  called Hadoop code. This is similar
> to
> > a
> > > problem we have in bioinformatics where a  colleague saw timeouts on
> his
> > 50
> > > node cluster.
> > >
> > > I would appreciate any help from the group. Note - if you have a text
> > file
> > > at least 4 GB the program will take that as an imput without trying to
> > > create its own file.
> > > /*
> > >
> >
> ============================================================================================
> > > */
> > > import org.apache.hadoop.conf.*;
> > > import org.apache.hadoop.fs.*;
> > > import org.apache.hadoop.io.*;
> > > import org.apache.hadoop.mapreduce.*;
> > > import org.apache.hadoop.mapreduce.lib.input.*;
> > > import org.apache.hadoop.mapreduce.lib.output.*;
> > > import org.apache.hadoop.util.*;
> > >
> > > import java.io.*;
> > > import java.util.*;
> > > /**
> > > * org.systemsbiology.hadoop.SubstringGenerator
> > >  *
> > >  * This illustrates an issue we are having where a mapper generating a
> > > much larger volume of
> > >  * data ans number of records times out even though the code is small,
> > > simple and fast
> > >  *
> > >  * NOTE!!! as written the program will generate a 4GB file in hdfs with
> > > good input data -
> > >  * this is done only if the file does not exist but may take several
> > > hours. It will only be
> > >  * done once. After that the failure is fairly fast
> > >  *
> > > * What this will do is count  unique Substrings of lines of length
> > > * between MIN_SUBSTRING_LENGTH and MAX_SUBSTRING_LENGTH by generatin
> all
> > > * substrings and  then using the word could algorithm
> > > * What is interesting is that the number and volume or writes in the
> > >  * map phase is MUCH larger than the number of reads and the volume of
> > > read data
> > >  *
> > >  * The example is artificial but similar the some real BioInformatics
> > > problems -
> > >  *  for example finding all substrings in a gemome can be important for
> > > the design of
> > >  *  microarrays.
> > >  *
> > >  *  While the real problem is more complex - the problem is that
> > >  *  when the input file is large enough the mappers time out failing to
> > > report after
> > >  *  600 sec. There is nothing slow in any of the application code and
> > > nothing I
> > > */
> > > public class SubstringCount  implements Tool   {
> > >
> > >
> > >    public static final long ONE_MEG = 1024 * 1024;
> > >    public static final long ONE_GIG = 1024 * ONE_MEG;
> > >    public static final int LINE_LENGTH = 100;
> > >    public static final Random RND = new Random();
> > >
> > >   // NOTE - edit this line to be a sensible location in the current
> file
> > > system
> > >    public static final String INPUT_FILE_PATH = "BigInputLines.txt";
> > >   // NOTE - edit this line to be a sensible location in the current
> file
> > > system
> > >    public static final String OUTPUT_FILE_PATH = "output";
> > >     // NOTE - edit this line to be the input file size - 4 * ONE_GIG
> > > should be large but not a problem
> > >    public static final long DESIRED_LENGTH = 4 * ONE_GIG;
> > >    // NOTE - limits on substring length
> > >    public static final int MINIMUM_LENGTH = 5;
> > >    public static final int MAXIMUM_LENGTH = 32;
> > >
> > >
> > >    /**
> > >     * create an input file to read
> > >     * @param fs !null file system
> > >     * @param p  !null path
> > >     * @throws IOException om error
> > >     */
> > >    public static void guaranteeInputFile(FileSystem fs, Path p) throws
> > > IOException {
> > >        if (fs.isFile(p)) {
> > >            FileStatus fileStatus = fs.getFileStatus(p);
> > >            if (fileStatus.getLen() >= DESIRED_LENGTH)
> > >                return;
> > >        }
> > >        FSDataOutputStream open = fs.create(p);
> > >        PrintStream ps = new PrintStream(open);
> > >         long showInterval = DESIRED_LENGTH  / 100;
> > >        for (long i = 0; i < DESIRED_LENGTH; i += LINE_LENGTH) {
> > >            writeRandomLine(ps, LINE_LENGTH);
> > >            // show progress
> > >            if(i % showInterval == 0)  {
> > >                System.err.print(".");
> > >
> > >            }
> > >        }
> > >        System.err.println("");
> > >        ps.close();
> > >    }
> > >
> > >    /**
> > >     * write a line with a random string of capital letters
> > >     *
> > >     * @param pPs         -  output
> > >     * @param pLineLength length of the line
> > >     */
> > >    public static void writeRandomLine(final PrintStream pPs, final int
> > > pLineLength) {
> > >        StringBuilder sb = new StringBuilder();
> > >        for (int i = 0; i < pLineLength; i++) {
> > >            char c = (char) ('A' + RND.nextInt(26));
> > >            sb.append(c);
> > >
> > >        }
> > >        String s = sb.toString();
> > >        pPs.println(s);
> > >    }
> > >
> > >
> > >
> > >    /**
> > >     * Construct a Configured.
> > >     */
> > >    public SubstringCount() {
> > >    }
> > >
> > >
> > >
> > >    /**
> > >     * similar to the Word Count mapper but one line generates a lot
> more
> > > output
> > >     */
> > >    public static class SubStringMapper
> > >            extends Mapper<Object, Text, Text, IntWritable> {
> > >
> > >        /**
> > >         * generate a array of substrings
> > >         *
> > >         * @param inp       input long string
> > >         * @param minLength minimum substring length
> > >         * @param maxLength maximum substring length
> > >         * @return !null array of strings
> > >         */
> > >        public static String[] generateSubStrings(String inp, int
> > > minLength, int maxLength) {
> > >            List<String> holder = new ArrayList<String>();
> > >            for (int start = 0; start < inp.length() - minLength;
> > start++) {
> > >                for (int end = start + minLength; end <
> > > Math.min(inp.length(), start + maxLength); end++) {
> > >                    try {
> > >                        holder.add(inp.substring(start, end));
> > >                    }
> > >                    catch (Exception e) {
> > >                        throw new RuntimeException(e);
> > >
> > >                    }
> > >                }
> > >            }
> > >
> > >            String[] ret = new String[holder.size()];
> > >            holder.toArray(ret);
> > >             return ret;
> > >        }
> > >
> > >        private final IntWritable one = new IntWritable(1);
> > >        private final Text word = new Text();
> > >
> > >        /**
> > >         * Like word count except the words are all substrings of the
> > input
> > > data
> > >         * This leads to a large increase
> > >         * @param key  irrelevant
> > >         * @param value  one read line
> > >         * @param context  !null context
> > >            */
> > >        public void map(Object key, Text value, Context context
> > >        ) throws IOException, InterruptedException {
> > >            String inp = value.toString();
> > >            // The written data is hundreds of times the input data
> > >            String[] data = generateSubStrings(inp, MINIMUM_LENGTH,
> > > MAXIMUM_LENGTH);
> > >            for (int i = 0; i < data.length; i++) {
> > >                String s = data[i];
> > >                word.set(s);
> > >                context.write(word, one);
> > >            }
> > >        }
> > >    }
> > >
> > >    /**
> > >     * Essentially the same reducer used by word count
> > >     */
> > >    public static class IntSumReducer
> > >            extends Reducer<Text, IntWritable, Text, IntWritable> {
> > >        private final IntWritable result = new IntWritable();
> > >
> > >        public void reduce(Text key, Iterable<IntWritable> values,
> > >                           Context context
> > >        ) throws IOException, InterruptedException {
> > >            int sum = 0;
> > >            for (IntWritable val : values) {
> > >                sum += val.get();
> > >            }
> > >            result.set(sum);
> > >            context.write(key, result);
> > >        }
> > >    }
> > >
> > >
> > >    /**
> > >     * kill a directory and all contents
> > >     * useful to make sure the output directory is empty
> > >     * @param src !null path of a directory
> > >     * @param fs !null file system
> > >     * @return  true on success
> > >     */
> > >    public static boolean expunge(Path src, FileSystem fs) {
> > >        try {
> > >            if (!fs.exists(src))
> > >                return true;
> > >            // break these out
> > >            if (fs.getFileStatus(src).isDir()) {
> > >                boolean doneOK = fs.delete(src, true);
> > >                doneOK = !fs.exists(src);
> > >                return doneOK;
> > >            }
> > >            if (fs.isFile(src)) {
> > >                boolean doneOK = fs.delete(src, false);
> > >                return doneOK;
> > >            }
> > >            throw new IllegalStateException("should be file of directory
> > if
> > > it exists");
> > >        }
> > >        catch (IOException e) {
> > >            throw new RuntimeException(e);
> > >        }
> > >
> > >    }
> > >
> > >     /**
> > >      * this implementation of run allows the program to start with a
> > > Configuration which may
> > >      * have been filled by other code
> > >      * @param conf !null conf
> > >      * @param args  !null arguments
> > >      * @return 0 on success
> > >      * @throws Exception  on error
> > >      */
> > >    public int runJob(Configuration conf, String[] args) throws
> Exception
> > {
> > >        String[] realArgs = new String[2];
> > >
> > >        String[] otherArgs = new GenericOptionsParser(conf,
> > > args).getRemainingArgs();
> > >
> > >        if(otherArgs.length > 0)
> > >             realArgs[0] = otherArgs[0];
> > >         else
> > >             realArgs[0] = INPUT_FILE_PATH;
> > >        if(otherArgs.length > 1)
> > >             realArgs[1] = otherArgs[1];
> > >         else
> > >             realArgs[1] = OUTPUT_FILE_PATH;
> > >
> > >        Path InputPath = new Path(realArgs[0]);
> > >        Path outputDir =  new Path(realArgs[1]);
> > >
> > >         Job job = new Job(conf, "Substring Generator");
> > >        conf = job.getConfiguration(); // NOTE JOB Copies the
> configuraton
> > >        job.setJarByClass(SubstringCount.class);
> > >        job.setMapperClass(SubStringMapper.class);
> > >        job.setCombinerClass(IntSumReducer.class);
> > >        job.setReducerClass(IntSumReducer.class);
> > >
> > >        job.setMapOutputKeyClass(Text.class);
> > >        job.setMapOutputValueClass(IntWritable.class);
> > >        job.setOutputKeyClass(Text.class);
> > >        job.setOutputValueClass(IntWritable.class);
> > >
> > >         job.setNumReduceTasks(72); // todo size for your cluster
> > >        FileInputFormat.addInputPath(job,InputPath);
> > >
> > >
> > >        FileSystem fileSystem = outputDir.getFileSystem(conf);
> > > // make sure the output directory is empty
> > >        expunge(outputDir, fileSystem);    // make sure thia does not
> > exist
> > >        FileOutputFormat.setOutputPath(job, outputDir);
> > >
> > >        // Now create something for the job to read
> > >        guaranteeInputFile(fileSystem, InputPath);
> > >
> > >
> > >        boolean ans = job.waitForCompletion(true);
> > >        int ret = ans ? 0 : 1;
> > >        return ret;
> > >    }
> > >
> > >
> > >    /**
> > >     * Execute the command with the given arguments.
> > >     *
> > >     * @param args command specific arguments.
> > >     * @return exit code.
> > >     * @throws Exception
> > >     */
> > >    @Override
> > >    public int run(final String[] args) throws Exception {
> > >        Configuration conf = new Configuration();
> > >        return runJob(conf, args);
> > >    }
> > >
> > >    /**
> > >     *
> > >     * @param args  args[0] is the path to a file to be created in the
> > > FileSystem
> > >     * args[1] is the path to an output directory in the file system -
> the
> > > contents WILL be deleted
> > >     * @throws Exception  on error
> > >     */
> > >    public static void main(String[] args) throws Exception {
> > >        ToolRunner.run(new SubstringCount(), args);
> > >    }
> > > }
> > >
> > > --
> > > Steven M. Lewis PhD
> > > 4221 105th Ave NE
> > > Kirkland, WA 98033
> > > 206-384-1340 (cell)
> > > Skype lordjoe_com
> >
>



-- 
Steven M. Lewis PhD
4221 105th Ave NE
Kirkland, WA 98033
206-384-1340 (cell)
Skype lordjoe_com

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message