hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Keren Ouaknine <ker...@gmail.com>
Subject Global sorting fails
Date Tue, 02 Sep 2014 18:24:51 GMT
Hello,

As a follow-up to this question
<http://mail-archives.apache.org/mod_mbox/hadoop-mapreduce-user/201408.mbox/%3CCAOcM5s9g9WzdfcJFxLvy0K7R3ezmvfuQfVoXcv=tXaJHCP3HWQ@mail.gmail.com%3E>,
I ended up changing the global sorting to use two chained MR jobs.
Thanks to the MapReduce Design Pattern book for the inspiration:
https://github.com/adamjshook/mapreducepatterns/blob/master/MRDP/src/main/java/mrdp/ch4/TotalOrderSorting.java

The code is as follows, *the mapper *:
 public static class Mapp extends Mapper<LongWritable, Text, Text, Text> {
 public void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
 System.out.println("MAPPER 1");

 // Split the line
List<Text> fields = Library.splitLine(value, ' ');
 if (fields.size() != 9) return;
System.out.println(fields.get(3));
 context.write(fields.get(3), value);
}
 }

 *The reducer: *

public static class Reduce extends Reducer<Text, Text, Text, Text> {

public void reduce(Text key, Iterable<Text> values, Context context) throws
IOException, InterruptedException {
 System.out.println("REDUCER 1");
for (Text val : values) {
 context.write(key, val);
}
}
 }

*and my driver code:*
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();

 String[] margs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (margs.length != 3) {
 System.err.println("Usage: TotalOrderSorting <user data> <out> parallel");
System.exit(1);
 }
System.out.println("in :"+margs[0]+" out :"+margs[1]+" parallel:
"+margs[2]);
 Path inputPath     = new Path(margs[0] + "/page_views");
String outputdir   = margs[1]+"/L9out";
 Path outputOrder   = new Path(outputdir);
Path partitionFile = new Path(outputdir + "_partitions.lst");
 Path outputStage   = new Path(outputdir + "_staging");
 Job job = new Job(conf, "L9 MR - TotalOrderSortingStage");
job.setJarByClass(L9.class);
 // Use the mapper implementation with zero reduce tasks
 job.setMapperClass(Mapp.class);
job.setNumReduceTasks(0);

 job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

TextInputFormat.setInputPaths(job, inputPath);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
 SequenceFileOutputFormat.setOutputPath(job, outputStage);

// Submit the job and get completion code.
 int code = job.waitForCompletion(true) ? 0 : 1;
if (code == 0) {
 Job orderJob = new Job(conf, "TotalOrderSortingStage");
orderJob.setJarByClass(L9.class);

// Here, use the identity mapper to output the key/value pairs in
// the SequenceFile
 orderJob.setMapperClass(Mapper.class);
orderJob.setReducerClass(Reduce.class);

// Set the number of reduce tasks to an appropriate number for the
// amount of data being sorted
 //orderJob.setNumReduceTasks(3);
orderJob.setNumReduceTasks(400);

// Use Hadoop's TotalOrderPartitioner class
orderJob.setPartitionerClass(TotalOrderPartitioner.class);

// Set the partition file
TotalOrderPartitioner.setPartitionFile(orderJob.getConfiguration(),
 partitionFile);

orderJob.setOutputKeyClass(Text.class);
 orderJob.setOutputValueClass(Text.class);

// Set the input to the previous job's output
 orderJob.setInputFormatClass(SequenceFileInputFormat.class);
SequenceFileInputFormat.setInputPaths(orderJob, outputStage);

// Set the output path to the command line parameter
TextOutputFormat.setOutputPath(orderJob, outputOrder);
 // Set the separator to an empty string
 orderJob.getConfiguration().set(
"mapred.textoutputformat.separator", "");

// Use the InputSampler to go through the output of the previous
// job, sample it, and create the partition file
 //InputSampler.writePartitionFile(orderJob,new
InputSampler.RandomSampler<Text, Text>(0.5, 3, 2));
//InputSampler.writePartitionFile(orderJob,new
InputSampler.RandomSampler<Text, Text>(.001, 399, 444));
 InputSampler.writePartitionFile(orderJob,new
InputSampler.RandomSampler<Text, Text>(.001, 10000));
// Submit the job
 code = orderJob.waitForCompletion(true) ? 0 : 2;
}

// Cleanup the partition file and the staging directory
 //FileSystem.get(new Configuration()).delete(partitionFile, false);
//FileSystem.get(new Configuration()).delete(outputStage, true);
 //FileSystem.get(new Configuration()).delete(outputOrder, true);

 System.exit(code);
}

 }

For some reason, on the second job I get an error as follows:
​     [exec] 14/09/01 20:08:48 INFO mapred.JobClient: Task Id :
attempt_201408311908_0046_m_000029_1, Status : FAILED
     [exec] java.lang.IllegalArgumentException: Can't read partitions file
     [exec]     at
org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner.setConf(TotalOrderPartitioner.java:116)
     [exec]     at
org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:62)
     [exec]     at
org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:117)
     [exec]     at
org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:676)
     [exec]     at
org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:756)
     [exec]     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)
     [exec]     at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
     [exec]     at java.security.AccessController.doPrivileged(Native
Method)
     [exec]     at javax.security.auth.Subject.doAs(Subject.java:415)
     [exec]     at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
     [exec]     at org.apache.hadoop.mapred.Child.main(Child.java:249)
     [exec] Caused by: java.io.IOException: Split points are out of order
     [exec]     at
org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner.setConf(TotalOrderPartitioner.java:96)
     [exec]     ... 10 more
​
As I am using the SplitSampler "out of the box​​", I am not sure how these
managed to get unordered?? The result is that the first job passes fine,
but the map phase of the second job fails with the above exception. Thanks
for the help!

Keren

-- 
Keren Ouaknine
www.kereno.com

Mime
View raw message