hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Christian Schneider <cschneiderpub...@gmail.com>
Subject Write huge values in Reduce Phase. "Hacked Outputformat" vs. "Direct write to HDFS" vs. ???
Date Wed, 13 Mar 2013 18:36:18 GMT
Hi together.
I'm not sure which approach to use. Currently I got two. Could you have a
look what's the best?

As "value" of the Reduce phase I get a List with *a lot* of values (large
then the heap size).
For a legacy system I need to create a file like this:

key1 value1, value2, value3, .... valueN
key2 value1, value2, value3, .... valueN

N > 1.000.000

During my research and some other mails, I got this two solutions:

*a) "Hackekd Outputformat" *

As described [0] a solution could be to write a own Outputformat. First a
key is returned, - then only "null"

public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter) {
  boolean firstKey = true;
  for (Text value : values) {
    output.collect(firstKey ? key : null, value); // it's possible to
call this N times...
    firstKey = false;

With this the Outputformat can recognize the "line change" to print a "\n".

The positive idea here is, that we follow the whole path. Map > ... >
Reduce > OutputFormat > HDFS

*b) "Write to HDFS in the reducer"*

As Harsh J mentioned here [1] it is possible to write to HDFS in the
Reducer Phase.
He gave also a link [2] from the Hadoop FAQ that says: it's "possible" to
do that.

With this information I implement this reducer:

public class UserToAppReducer extends Reducer<Text, Text, Text, Text> {
private static final int BUFFER_SIZE = 5 * 1024 * 1024;

private BufferedWriter br;

protected void setup(final Context context) throws IOException,
InterruptedException {
final FileSystem fs = FileSystem.get(context.getConfiguration());

final Path outputPath = FileOutputFormat.getOutputPath(context);

final String fileName = "reducer" + context.getTaskAttemptID().getId() +
"_" + context.getTaskAttemptID().getTaskID().getId() + "_" + new

this.br = new BufferedWriter(new OutputStreamWriter(fs.create(new
Path(outputPath, fileName))), BUFFER_SIZE);

protected void reduce(final Text appId, final Iterable<Text> userIds, final
Context context) throws IOException, InterruptedException {

for (final Text text : userIds) {


protected void cleanup(final Context context) throws IOException,
InterruptedException {

Both ways are running fine, but which approach should I take and are they
alternative ways?

Thanks a lot.

Best Regards,


P.S.: Sorry for using html :)

View raw message