hadoop-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ruiz, Pierre" <Pierre.R...@analog.com>
Subject mpareduce.job.reduces greater than 1 crashes TotalOrderPartitioner and/or RandomInputSampler
Date Fri, 10 Nov 2017 12:49:53 GMT
First time poster, I hope I'm not breaking any rule and if I am please tell me :)

I am trying to perform "total order" sorting on data previously stored on a pseudo-distributed
HDFS (running with YARN) into a SequenceFile (keys: IntWritable, vals: Text).  The SequenceFile,
stored at /cnts/part-r-00000 to part-r-00006, contains 44 records altogether.

bin/hdfs dfs -text /cnts* prints 44 lines like:
2       314     |       12      |       21
2       700     |       12      |       21
1       700     |       12      |       28
1       2       |       420     |       11120
2       2       |       11      |       3
2       700     |       11      |       3
1       700     |       12      |       19
(...)
1       314     |       12      |       30
3       314     |       420     |       6
3       700     |       420     |       6
3       2       |       420     |       6
1       2       |       421     |       36

I run the job as follows, within main:
int exitCode = -1;
if ("sort".equals(args[0])) {
    exitCode = ToolRunner.run(new Configuration(),
                              new Sorter(),
                              Arrays.copyOfRange(args,1,args.length));
}

Sorter extends Configured, and implements Tool as follows:
public int run(String[] args) throws Exception {
    (...)
    //Set job-specific params
    job.setJobName("sort");
    job.setInputFormatClass(SequenceFileInputFormat.class);
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    job.setPartitionerClass(TotalOrderPartitioner.class);

    // Setup and run subset sampling
    Double p = .1;
    int maxNbSamples = 10;
    int maxNbSplitsRead = 3;
    InputSampler.Sampler<IntWritable, Text> sampler =
          new InputSampler.RandomSampler<IntWritable,Text>(p,
                                                           maxNbSamples,
                                                           maxNbSplitsRead);
    InputSampler.writePartitionFile(job, sampler);

    // Submit the job & poll for progress until it completes
    return job.waitForCompletion(true) ? 0 : -1;
}

I use the default "identity" Mapper and Reducer classes together with TotalOrderPartitioner
and a RandomInputSampler to configure the partitioning.  It works fine so long as I only use
a single reducer task but becomes unstable when -Dmpareduce.job.reduces=N with N > 1.

By unstable, I mean that it apparently randomly
* terminates successfully;
* fails with ArrayIndexOutOfBoundsException @ InputSampler.writePartitionFile() and exits;
* or fails with IllegalArgumentException: Can't read partitions file @ TotalOrderPArtitioner.setConf()
and crashes my machine.

The probability distribution of these 3 outcomes seems to vary slightly if I change the values
passed to  the InputSampler constructor and the number of reduce tasks, but the tool is never
stable.

Can anyone shed light on this at all?

Mime
View raw message