hadoop-common-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Segel <michael_se...@hotmail.com>
Subject Re: Problems with timeout when a Hadoop job generates a large number of key-value pairs
Date Fri, 20 Jan 2012 23:43:11 GMT
Steve, 
Ok, first your client connection to the cluster is a non issue.

If you go in to /etc/Hadoop/conf 
That supposed to be a little h but my iPhone knows what's best...

Look and see what you have set for your bandwidth... I forget which parameter but there are
only a couple that deal with bandwidth.
I think it's set to 1mb or 10mb by default. You need to up it to 100-200mb if you're on a
1 GB network .

That would solve you balancing issue.

See if that helps...

Sent from my iPhone

On Jan 20, 2012, at 4:57 PM, "Steve Lewis" <lordjoe2000@gmail.com> wrote:

> On Fri, Jan 20, 2012 at 12:18 PM, Michel Segel <michael_segel@hotmail.com>wrote:
> 
>> Steve,
>> If you want me to debug your code, I'll be glad to set up a billable
>> contract... ;-)
>> 
>> What I am willing to do is to help you to debug your code..
> 
> 
> The code seems to work well for small input files and is basically a
> standard sample.
> 
>> .
>> 
>> Did you time how long it takes in the Mapper.map() method?
>> The reason I asked this is to first confirm that you are failing within a
>> map() method.
>> It could be that you're just not updating your status...
>> 
> 
> The map map method starts out running very fast - generateSubstrings - the
> only interesting part runs in milliseconds. The only other thing the mapper
> does is context,write which SHOULD update status
> 
>> 
>> You said that you are writing many output records for a single input.
>> 
>> So let's take a look at your code.
>> Are all writes of the same length? Meaning that in each iteration of
>> Mapper.map() you will always write. K number of rows?
>> 
> 
> Because in my sample the input strings are the same length - every call to
> the mapper will write the same number of records
> 
>> 
>> If so, ask yourself why some iterations are taking longer and longer?
>> 
> 
> I believe the issue may relate to local storage getting filled and Hadoop
> taking a LOT of time to rebalance the output, Assuming the string length is
> the same on each map there is no reason for some iterations to me longer
> than others
> 
>> 
>> Note: I'm assuming that the time for each iteration is taking longer than
>> the previous...
>> 
>> I assume so as well since in m,y cluster the first 50% of mapping goes
> pretty fast
> 
>> Or am I missing something?
>> 
>> How do I get timing of map iteratons??
> 
>> -Mike
>> 
>> Sent from a remote device. Please excuse any typos...
>> 
>> Mike Segel
>> 
>> On Jan 20, 2012, at 11:16 AM, Steve Lewis <lordjoe2000@gmail.com> wrote:
>> 
>>> We have been having problems with mappers timing out after 600 sec when
>> the
>>> mapper writes many more, say thousands of records for every
>>> input record - even when the code in the mapper is small and fast. I
>>> no idea what could cause the system to be so slow and am reluctant to
>> raise
>>> the 600 sec limit without understanding why there should be a timeout
>> when
>>> all MY code is very fast.
>>> P
>>> I am enclosing a small sample which illustrates the problem. It will
>>> generate a 4GB text file on hdfs if the input file does not exist or is
>> not
>>> at least that size and this will take some time (hours in my
>> configuration)
>>> - then the code is essentially wordcount but instead of finding and
>>> emitting words - the mapper emits all substrings of the input data - this
>>> generates a much larger output data and number of output records than
>>> wordcount generates.
>>> Still, the amount of data emitted is no larger than other data sets I
>> know
>>> Hadoop can handle.
>>> 
>>> All mappers on my 8 node cluster eventually timeout after 600 sec - even
>>> though I see nothing in the code which is even a little slow and suspect
>>> that any slow behavior is in the  called Hadoop code. This is similar to
>> a
>>> problem we have in bioinformatics where a  colleague saw timeouts on his
>> 50
>>> node cluster.
>>> 
>>> I would appreciate any help from the group. Note - if you have a text
>> file
>>> at least 4 GB the program will take that as an imput without trying to
>>> create its own file.
>>> /*
>>> 
>> ============================================================================================
>>> */
>>> import org.apache.hadoop.conf.*;
>>> import org.apache.hadoop.fs.*;
>>> import org.apache.hadoop.io.*;
>>> import org.apache.hadoop.mapreduce.*;
>>> import org.apache.hadoop.mapreduce.lib.input.*;
>>> import org.apache.hadoop.mapreduce.lib.output.*;
>>> import org.apache.hadoop.util.*;
>>> 
>>> import java.io.*;
>>> import java.util.*;
>>> /**
>>> * org.systemsbiology.hadoop.SubstringGenerator
>>> *
>>> * This illustrates an issue we are having where a mapper generating a
>>> much larger volume of
>>> * data ans number of records times out even though the code is small,
>>> simple and fast
>>> *
>>> * NOTE!!! as written the program will generate a 4GB file in hdfs with
>>> good input data -
>>> * this is done only if the file does not exist but may take several
>>> hours. It will only be
>>> * done once. After that the failure is fairly fast
>>> *
>>> * What this will do is count  unique Substrings of lines of length
>>> * between MIN_SUBSTRING_LENGTH and MAX_SUBSTRING_LENGTH by generatin all
>>> * substrings and  then using the word could algorithm
>>> * What is interesting is that the number and volume or writes in the
>>> * map phase is MUCH larger than the number of reads and the volume of
>>> read data
>>> *
>>> * The example is artificial but similar the some real BioInformatics
>>> problems -
>>> *  for example finding all substrings in a gemome can be important for
>>> the design of
>>> *  microarrays.
>>> *
>>> *  While the real problem is more complex - the problem is that
>>> *  when the input file is large enough the mappers time out failing to
>>> report after
>>> *  600 sec. There is nothing slow in any of the application code and
>>> nothing I
>>> */
>>> public class SubstringCount  implements Tool   {
>>> 
>>> 
>>>   public static final long ONE_MEG = 1024 * 1024;
>>>   public static final long ONE_GIG = 1024 * ONE_MEG;
>>>   public static final int LINE_LENGTH = 100;
>>>   public static final Random RND = new Random();
>>> 
>>>  // NOTE - edit this line to be a sensible location in the current file
>>> system
>>>   public static final String INPUT_FILE_PATH = "BigInputLines.txt";
>>>  // NOTE - edit this line to be a sensible location in the current file
>>> system
>>>   public static final String OUTPUT_FILE_PATH = "output";
>>>    // NOTE - edit this line to be the input file size - 4 * ONE_GIG
>>> should be large but not a problem
>>>   public static final long DESIRED_LENGTH = 4 * ONE_GIG;
>>>   // NOTE - limits on substring length
>>>   public static final int MINIMUM_LENGTH = 5;
>>>   public static final int MAXIMUM_LENGTH = 32;
>>> 
>>> 
>>>   /**
>>>    * create an input file to read
>>>    * @param fs !null file system
>>>    * @param p  !null path
>>>    * @throws IOException om error
>>>    */
>>>   public static void guaranteeInputFile(FileSystem fs, Path p) throws
>>> IOException {
>>>       if (fs.isFile(p)) {
>>>           FileStatus fileStatus = fs.getFileStatus(p);
>>>           if (fileStatus.getLen() >= DESIRED_LENGTH)
>>>               return;
>>>       }
>>>       FSDataOutputStream open = fs.create(p);
>>>       PrintStream ps = new PrintStream(open);
>>>        long showInterval = DESIRED_LENGTH  / 100;
>>>       for (long i = 0; i < DESIRED_LENGTH; i += LINE_LENGTH) {
>>>           writeRandomLine(ps, LINE_LENGTH);
>>>           // show progress
>>>           if(i % showInterval == 0)  {
>>>               System.err.print(".");
>>> 
>>>           }
>>>       }
>>>       System.err.println("");
>>>       ps.close();
>>>   }
>>> 
>>>   /**
>>>    * write a line with a random string of capital letters
>>>    *
>>>    * @param pPs         -  output
>>>    * @param pLineLength length of the line
>>>    */
>>>   public static void writeRandomLine(final PrintStream pPs, final int
>>> pLineLength) {
>>>       StringBuilder sb = new StringBuilder();
>>>       for (int i = 0; i < pLineLength; i++) {
>>>           char c = (char) ('A' + RND.nextInt(26));
>>>           sb.append(c);
>>> 
>>>       }
>>>       String s = sb.toString();
>>>       pPs.println(s);
>>>   }
>>> 
>>> 
>>> 
>>>   /**
>>>    * Construct a Configured.
>>>    */
>>>   public SubstringCount() {
>>>   }
>>> 
>>> 
>>> 
>>>   /**
>>>    * similar to the Word Count mapper but one line generates a lot more
>>> output
>>>    */
>>>   public static class SubStringMapper
>>>           extends Mapper<Object, Text, Text, IntWritable> {
>>> 
>>>       /**
>>>        * generate a array of substrings
>>>        *
>>>        * @param inp       input long string
>>>        * @param minLength minimum substring length
>>>        * @param maxLength maximum substring length
>>>        * @return !null array of strings
>>>        */
>>>       public static String[] generateSubStrings(String inp, int
>>> minLength, int maxLength) {
>>>           List<String> holder = new ArrayList<String>();
>>>           for (int start = 0; start < inp.length() - minLength;
>> start++) {
>>>               for (int end = start + minLength; end <
>>> Math.min(inp.length(), start + maxLength); end++) {
>>>                   try {
>>>                       holder.add(inp.substring(start, end));
>>>                   }
>>>                   catch (Exception e) {
>>>                       throw new RuntimeException(e);
>>> 
>>>                   }
>>>               }
>>>           }
>>> 
>>>           String[] ret = new String[holder.size()];
>>>           holder.toArray(ret);
>>>            return ret;
>>>       }
>>> 
>>>       private final IntWritable one = new IntWritable(1);
>>>       private final Text word = new Text();
>>> 
>>>       /**
>>>        * Like word count except the words are all substrings of the
>> input
>>> data
>>>        * This leads to a large increase
>>>        * @param key  irrelevant
>>>        * @param value  one read line
>>>        * @param context  !null context
>>>           */
>>>       public void map(Object key, Text value, Context context
>>>       ) throws IOException, InterruptedException {
>>>           String inp = value.toString();
>>>           // The written data is hundreds of times the input data
>>>           String[] data = generateSubStrings(inp, MINIMUM_LENGTH,
>>> MAXIMUM_LENGTH);
>>>           for (int i = 0; i < data.length; i++) {
>>>               String s = data[i];
>>>               word.set(s);
>>>               context.write(word, one);
>>>           }
>>>       }
>>>   }
>>> 
>>>   /**
>>>    * Essentially the same reducer used by word count
>>>    */
>>>   public static class IntSumReducer
>>>           extends Reducer<Text, IntWritable, Text, IntWritable> {
>>>       private final IntWritable result = new IntWritable();
>>> 
>>>       public void reduce(Text key, Iterable<IntWritable> values,
>>>                          Context context
>>>       ) throws IOException, InterruptedException {
>>>           int sum = 0;
>>>           for (IntWritable val : values) {
>>>               sum += val.get();
>>>           }
>>>           result.set(sum);
>>>           context.write(key, result);
>>>       }
>>>   }
>>> 
>>> 
>>>   /**
>>>    * kill a directory and all contents
>>>    * useful to make sure the output directory is empty
>>>    * @param src !null path of a directory
>>>    * @param fs !null file system
>>>    * @return  true on success
>>>    */
>>>   public static boolean expunge(Path src, FileSystem fs) {
>>>       try {
>>>           if (!fs.exists(src))
>>>               return true;
>>>           // break these out
>>>           if (fs.getFileStatus(src).isDir()) {
>>>               boolean doneOK = fs.delete(src, true);
>>>               doneOK = !fs.exists(src);
>>>               return doneOK;
>>>           }
>>>           if (fs.isFile(src)) {
>>>               boolean doneOK = fs.delete(src, false);
>>>               return doneOK;
>>>           }
>>>           throw new IllegalStateException("should be file of directory
>> if
>>> it exists");
>>>       }
>>>       catch (IOException e) {
>>>           throw new RuntimeException(e);
>>>       }
>>> 
>>>   }
>>> 
>>>    /**
>>>     * this implementation of run allows the program to start with a
>>> Configuration which may
>>>     * have been filled by other code
>>>     * @param conf !null conf
>>>     * @param args  !null arguments
>>>     * @return 0 on success
>>>     * @throws Exception  on error
>>>     */
>>>   public int runJob(Configuration conf, String[] args) throws Exception
>> {
>>>       String[] realArgs = new String[2];
>>> 
>>>       String[] otherArgs = new GenericOptionsParser(conf,
>>> args).getRemainingArgs();
>>> 
>>>       if(otherArgs.length > 0)
>>>            realArgs[0] = otherArgs[0];
>>>        else
>>>            realArgs[0] = INPUT_FILE_PATH;
>>>       if(otherArgs.length > 1)
>>>            realArgs[1] = otherArgs[1];
>>>        else
>>>            realArgs[1] = OUTPUT_FILE_PATH;
>>> 
>>>       Path InputPath = new Path(realArgs[0]);
>>>       Path outputDir =  new Path(realArgs[1]);
>>> 
>>>        Job job = new Job(conf, "Substring Generator");
>>>       conf = job.getConfiguration(); // NOTE JOB Copies the configuraton
>>>       job.setJarByClass(SubstringCount.class);
>>>       job.setMapperClass(SubStringMapper.class);
>>>       job.setCombinerClass(IntSumReducer.class);
>>>       job.setReducerClass(IntSumReducer.class);
>>> 
>>>       job.setMapOutputKeyClass(Text.class);
>>>       job.setMapOutputValueClass(IntWritable.class);
>>>       job.setOutputKeyClass(Text.class);
>>>       job.setOutputValueClass(IntWritable.class);
>>> 
>>>        job.setNumReduceTasks(72); // todo size for your cluster
>>>       FileInputFormat.addInputPath(job,InputPath);
>>> 
>>> 
>>>       FileSystem fileSystem = outputDir.getFileSystem(conf);
>>> // make sure the output directory is empty
>>>       expunge(outputDir, fileSystem);    // make sure thia does not
>> exist
>>>       FileOutputFormat.setOutputPath(job, outputDir);
>>> 
>>>       // Now create something for the job to read
>>>       guaranteeInputFile(fileSystem, InputPath);
>>> 
>>> 
>>>       boolean ans = job.waitForCompletion(true);
>>>       int ret = ans ? 0 : 1;
>>>       return ret;
>>>   }
>>> 
>>> 
>>>   /**
>>>    * Execute the command with the given arguments.
>>>    *
>>>    * @param args command specific arguments.
>>>    * @return exit code.
>>>    * @throws Exception
>>>    */
>>>   @Override
>>>   public int run(final String[] args) throws Exception {
>>>       Configuration conf = new Configuration();
>>>       return runJob(conf, args);
>>>   }
>>> 
>>>   /**
>>>    *
>>>    * @param args  args[0] is the path to a file to be created in the
>>> FileSystem
>>>    * args[1] is the path to an output directory in the file system - the
>>> contents WILL be deleted
>>>    * @throws Exception  on error
>>>    */
>>>   public static void main(String[] args) throws Exception {
>>>       ToolRunner.run(new SubstringCount(), args);
>>>   }
>>> }
>>> 
>>> --
>>> Steven M. Lewis PhD
>>> 4221 105th Ave NE
>>> Kirkland, WA 98033
>>> 206-384-1340 (cell)
>>> Skype lordjoe_com
>> 
> 
> 
> 
> -- 
> Steven M. Lewis PhD
> 4221 105th Ave NE
> Kirkland, WA 98033
> 206-384-1340 (cell)
> Skype lordjoe_com

Mime
View raw message