hadoop-hdfs-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Christian Schneider <cschneiderpub...@gmail.com>
Subject Re: OutOfMemory during Plain Java MapReduce
Date Mon, 11 Mar 2013 16:41:10 GMT
Thanks Paul and Harsh for your Tipps!
I implemented the secondary sort and the related mapper.
This is a very good idea to get a unique set.

The original Question how to translate the "huge" Values (in terms of a
"large" list of users for one key) into the format I need is still
"somehow" open.

If the reducer get's this input:
key1, Iterator[value1, value2, value3, ..., valueN]
key2, Iterator[value1, value2, value3, value4, value5, value6, ..., valueN]
...

How to write this in a textfile formatted like this:
key1 value1 value2 value3 ... valueN N
key2 value1 value2 value3 value4 value5 value6 ... valueN N
...

As Harsh said in a previous mail, I wrote a reducer that writes into HDFS
directly.
But I still don't know whether this is a good idea or a workaround.

I came up with this reducer now:

//-----------------------------------------------
// The Reducer
// It writes to HDFS. So there is no OutputFormat needed.
//-----------------------------------------------
public class UserToAppReducer extends Reducer<AppAndUserKey, Text, Text,
Text> {
private static final int BUFFER_SIZE = 5 * 1024 * 1024;

private BufferedWriter br;

@Override
protected void setup(final Context context) throws IOException,
InterruptedException {
final FileSystem fs = FileSystem.get(context.getConfiguration());

final Path outputPath = FileOutputFormat.getOutputPath(context);

final String fileName = "reducer" + context.getTaskAttemptID().getId() +
"_" + context.getTaskAttemptID().getTaskID().getId() + "_" + new
Random(System.currentTimeMillis()).nextInt(10000);

this.br = new BufferedWriter(new OutputStreamWriter(fs.create(new
Path(outputPath, fileName))), BUFFER_SIZE);
}

@Override
protected void reduce(final AppAndUserKey appAndUserKey, final
Iterable<Text> userIds, final Context context) throws IOException,
InterruptedException {
Text lastUserId = new Text();

long count = 0;

this.br.append(appAndUserKey.getAppIdText().toString()).append('\t');

for (final Text text : userIds) {
if (lastUserId.equals(text))
continue;

this.br.append(text.toString()).append('\t');

count++;
lastUserId = text;
}

this.br.append(String.valueOf(count)).append("\n").append('\n');
}

@Override
protected void cleanup(final Context context) throws IOException,
InterruptedException {
this.br.close();
}
}


Is this the best way achieve this (with plain Map Reduce)?

Or is it better to return some composite keys to user a custom outputformat?


Thanks a lot!

Best Regards,
Christian.


2013/3/9 Harsh J <harsh@cloudera.com>

> Paul's way is much more easier than doing the serialization way I
> mentioned earlier. I didn't pay attention to the logic used but just
> the implementation, my bad :)
>
> On Fri, Mar 8, 2013 at 5:39 PM, Paul Wilkinson <paulw@cloudera.com> wrote:
> > As always, what Harsh said :)
> >
> > Looking at your reducer code, it appears that you are trying to compute
> the
> > distinct set of user IDs for a given reduce key. Rather than computing
> this
> > by holding the set in memory, use a secondary sort of the reduce values,
> > then while iterating over the reduce values, look for changes of user id.
> > Whenever it changes, write out the key and the newly found value.
> >
> > Your output will change from this:
> >
> >   key, [value 1, value2, ... valueN]
> >
> > to this:
> >
> >   key, value1
> >   key, value2
> >        ...
> >   key, valueN
> >
> > Whether this is suitable for your follow-on processing is the next
> question,
> > but this approach will scale to whatever data you can throw at it.
> >
> > Paul
> >
> >
> > On 8 March 2013 10:57, Harsh J <harsh@cloudera.com> wrote:
> >>
> >> Hi,
> >>
> >> When you implement code that starts memory-storing value copies for
> >> every record (even if of just a single key), things are going to break
> >> in big-data-land. Practically, post-partitioning, the # of values for
> >> a given key can be huge given the source data, so you cannot hold it
> >> all in and then write in one go. You'd probably need to write out
> >> something continuously if you really really want to do this, or use an
> >> alternative form of key-value storage where updates can be made
> >> incrementally (Apache HBase is such a store, as one example).
> >>
> >> This has been discussed before IIRC, and if the goal were to store the
> >> outputs onto a file then its better to just directly serialize them
> >> with a file opened instead of keeping it in a data structure and
> >> serializing it at the end. The caveats that'd apply if you were to
> >> open your own file from a task are described at
> >>
> >>
> http://wiki.apache.org/hadoop/FAQ#Can_I_write_create.2BAC8-write-to_hdfs_files_directly_from_map.2BAC8-reduce_tasks.3F
> .
> >>
> >> On Fri, Mar 8, 2013 at 4:35 AM, Christian Schneider
> >> <cschneiderpublic@gmail.com> wrote:
> >> > I had a look to the stacktrace and it says the problem is at the
> >> > reducer:
> >> > userSet.add(iterator.next().toString());
> >> >
> >> > Error: Java heap space
> >> > attempt_201303072200_0016_r_000002_0: WARN : mapreduce.Counters -
> Group
> >> > org.apache.hadoop.mapred.Task$Counter is deprecated. Use
> >> > org.apache.hadoop.mapreduce.TaskCounter instead
> >> > attempt_201303072200_0016_r_000002_0: WARN :
> >> > org.apache.hadoop.conf.Configuration - session.id is deprecated.
> >> > Instead,
> >> > use dfs.metrics.session-id
> >> > attempt_201303072200_0016_r_000002_0: WARN :
> >> > org.apache.hadoop.conf.Configuration - slave.host.name is deprecated.
> >> > Instead, use dfs.datanode.hostname
> >> > attempt_201303072200_0016_r_000002_0: FATAL:
> >> > org.apache.hadoop.mapred.Child
> >> > - Error running child : java.lang.OutOfMemoryError: Java heap space
> >> > attempt_201303072200_0016_r_000002_0: at
> >> > java.util.Arrays.copyOfRange(Arrays.java:3209)
> >> > attempt_201303072200_0016_r_000002_0: at
> >> > java.lang.String.<init>(String.java:215)
> >> > attempt_201303072200_0016_r_000002_0: at
> >> > java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:542)
> >> > attempt_201303072200_0016_r_000002_0: at
> >> > java.nio.CharBuffer.toString(CharBuffer.java:1157)
> >> > attempt_201303072200_0016_r_000002_0: at
> >> > org.apache.hadoop.io.Text.decode(Text.java:394)
> >> > attempt_201303072200_0016_r_000002_0: at
> >> > org.apache.hadoop.io.Text.decode(Text.java:371)
> >> > attempt_201303072200_0016_r_000002_0: at
> >> > org.apache.hadoop.io.Text.toString(Text.java:273)
> >> > attempt_201303072200_0016_r_000002_0: at
> >> > com.myCompany.UserToAppReducer.reduce(RankingReducer.java:21)
> >> > attempt_201303072200_0016_r_000002_0: at
> >> > com.myCompany.UserToAppReducer.reduce(RankingReducer.java:1)
> >> > attempt_201303072200_0016_r_000002_0: at
> >> > org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:164)
> >> > attempt_201303072200_0016_r_000002_0: at
> >> > org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:610)
> >> > attempt_201303072200_0016_r_000002_0: at
> >> > org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:444)
> >> > attempt_201303072200_0016_r_000002_0: at
> >> > org.apache.hadoop.mapred.Child$4.run(Child.java:268)
> >> > attempt_201303072200_0016_r_000002_0: at
> >> > java.security.AccessController.doPrivileged(Native Method)
> >> > attempt_201303072200_0016_r_000002_0: at
> >> > javax.security.auth.Subject.doAs(Subject.java:396)
> >> > attempt_201303072200_0016_r_000002_0: at
> >> >
> >> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
> >> > attempt_201303072200_0016_r_000002_0: at
> >> > org.apache.hadoop.mapred.Child.main(Child.java:262)
> >> >
> >> > But how to solve this?
> >> >
> >> >
> >> > 2013/3/7 Christian Schneider <cschneiderpublic@gmail.com>
> >> >>
> >> >> Hi,
> >> >> during the Reduce phase or afterwards (i don't really know how to
> debug
> >> >> it) I get a heap out of Memory Exception.
> >> >>
> >> >> I guess this is because the value of the reduce task (a Custom
> >> >> Writable)
> >> >> holds a List with a lot of user ids.
> >> >> The Setup is quite simple. This are the related classes I used:
> >> >>
> >> >> //-----------------------------------------------
> >> >> // The Reducer
> >> >> // It just add all userIds of the Iterable to the UserSetWriteAble
> >> >> //-----------------------------------------------
> >> >> public class UserToAppReducer extends Reducer<Text, Text, Text,
> >> >> UserSetWritable> {
> >> >>
> >> >> @Override
> >> >> protected void reduce(final Text appId, final Iterable<Text>
userIds,
> >> >> final Context context) throws IOException, InterruptedException  {
> >> >> final UserSetWritable userSet = new UserSetWritable();
> >> >>
> >> >> final Iterator<Text> iterator = userIds.iterator();
> >> >> while (iterator.hasNext()) {
> >> >> userSet.add(iterator.next().toString());
> >> >> }
> >> >>
> >> >> context.write(appId, userSet);
> >> >> }
> >> >> }
> >> >>
> >> >> //-----------------------------------------------
> >> >> // The Custom Writable
> >> >> // Needed to implement a own toString Method bring the output into
> the
> >> >> right format. Maybe i can to this also with a own OutputFormat class.
> >> >> //-----------------------------------------------
> >> >> public class UserSetWritable implements Writable {
> >> >> private final Set<String> userIds = new HashSet<String>();
> >> >>
> >> >> public void add(final String userId) {
> >> >> this.userIds.add(userId);
> >> >> }
> >> >>
> >> >> @Override
> >> >> public void write(final DataOutput out) throws IOException {
> >> >> out.writeInt(this.userIds.size());
> >> >> for (final String userId : this.userIds) {
> >> >> out.writeUTF(userId);
> >> >> }
> >> >> }
> >> >>
> >> >> @Override
> >> >> public void readFields(final DataInput in) throws IOException {
> >> >> final int size = in.readInt();
> >> >> for (int i = 0; i < size; i++) {
> >> >> final String readUTF = in.readUTF();
> >> >> this.userIds.add(readUTF);
> >> >> }
> >> >> }
> >> >>
> >> >> @Override
> >> >> public String toString() {
> >> >> String result = "";
> >> >> for (final String userId : this.userIds) {
> >> >> result += userId + "\t";
> >> >> }
> >> >>
> >> >> result += this.userIds.size();
> >> >> return result;
> >> >> }
> >> >> }
> >> >>
> >> >> As Outputformat I used the default TextOutputFormat.
> >> >>
> >> >> A potential problem could be, that a reduce is going to write files
> >> >> >600MB
> >> >> and our mapred.child.java.opts is set to ~380MB.
> >> >> I digged deeper into the TextOutputFormat and saw, that the
> >> >> HdfsDataOutputStream is not implementing .flush().
> >> >> And .flush is also not used in TextOutputFormat. This means, that the
> >> >> whole file is kept in RAM and then persisted at the end of
> processing,
> >> >> or?
> >> >> And of course, this leads into the exception.
> >> >>
> >> >> With PIG I am able to query the same Data. Even with one reducer
> only.
> >> >> But I have a bet to make it faster with plain MapReduce :)
> >> >>
> >> >> Could you help me how to debug this and maybe point me into the right
> >> >> direction?
> >> >>
> >> >> Best Regards,
> >> >> Christian.
> >> >
> >> >
> >>
> >>
> >>
> >> --
> >> Harsh J
> >
> >
>
>
>
> --
> Harsh J
>

Mime
View raw message