hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Paul Wilkinson <pa...@cloudera.com>
Subject Re: OutOfMemory during Plain Java MapReduce
Date Fri, 08 Mar 2013 12:09:32 GMT
As always, what Harsh said :)

Looking at your reducer code, it appears that you are trying to compute the distinct set of
user IDs for a given reduce key. Rather than computing this by holding the set in memory,
use a secondary sort of the reduce values, then while iterating over the reduce values, look
for changes of user id. Whenever it changes, write out the key and the newly found value.

Your output will change from this:

  key, [value 1, value2, ... valueN]

to this:

  key, value1
  key, value2
       ...
  key, valueN

Whether this is suitable for your follow-on processing is the next question, but this approach
will scale to whatever data you can throw at it.

Paul


On 8 March 2013 10:57, Harsh J <harsh@cloudera.com> wrote:
> Hi,
> 
> When you implement code that starts memory-storing value copies for
> every record (even if of just a single key), things are going to break
> in big-data-land. Practically, post-partitioning, the # of values for
> a given key can be huge given the source data, so you cannot hold it
> all in and then write in one go. You'd probably need to write out
> something continuously if you really really want to do this, or use an
> alternative form of key-value storage where updates can be made
> incrementally (Apache HBase is such a store, as one example).
> 
> This has been discussed before IIRC, and if the goal were to store the
> outputs onto a file then its better to just directly serialize them
> with a file opened instead of keeping it in a data structure and
> serializing it at the end. The caveats that'd apply if you were to
> open your own file from a task are described at
> http://wiki.apache.org/hadoop/FAQ#Can_I_write_create.2BAC8-write-to_hdfs_files_directly_from_map.2BAC8-reduce_tasks.3F.
> 
> On Fri, Mar 8, 2013 at 4:35 AM, Christian Schneider
> <cschneiderpublic@gmail.com> wrote:
> > I had a look to the stacktrace and it says the problem is at the reducer:
> > userSet.add(iterator.next().toString());
> >
> > Error: Java heap space
> > attempt_201303072200_0016_r_000002_0: WARN : mapreduce.Counters - Group
> > org.apache.hadoop.mapred.Task$Counter is deprecated. Use
> > org.apache.hadoop.mapreduce.TaskCounter instead
> > attempt_201303072200_0016_r_000002_0: WARN :
> > org.apache.hadoop.conf.Configuration - session.id is deprecated. Instead,
> > use dfs.metrics.session-id
> > attempt_201303072200_0016_r_000002_0: WARN :
> > org.apache.hadoop.conf.Configuration - slave.host.name is deprecated.
> > Instead, use dfs.datanode.hostname
> > attempt_201303072200_0016_r_000002_0: FATAL: org.apache.hadoop.mapred.Child
> > - Error running child : java.lang.OutOfMemoryError: Java heap space
> > attempt_201303072200_0016_r_000002_0: at
> > java.util.Arrays.copyOfRange(Arrays.java:3209)
> > attempt_201303072200_0016_r_000002_0: at
> > java.lang.String.<init>(String.java:215)
> > attempt_201303072200_0016_r_000002_0: at
> > java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:542)
> > attempt_201303072200_0016_r_000002_0: at
> > java.nio.CharBuffer.toString(CharBuffer.java:1157)
> > attempt_201303072200_0016_r_000002_0: at
> > org.apache.hadoop.io.Text.decode(Text.java:394)
> > attempt_201303072200_0016_r_000002_0: at
> > org.apache.hadoop.io.Text.decode(Text.java:371)
> > attempt_201303072200_0016_r_000002_0: at
> > org.apache.hadoop.io.Text.toString(Text.java:273)
> > attempt_201303072200_0016_r_000002_0: at
> > com.myCompany.UserToAppReducer.reduce(RankingReducer.java:21)
> > attempt_201303072200_0016_r_000002_0: at
> > com.myCompany.UserToAppReducer.reduce(RankingReducer.java:1)
> > attempt_201303072200_0016_r_000002_0: at
> > org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:164)
> > attempt_201303072200_0016_r_000002_0: at
> > org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:610)
> > attempt_201303072200_0016_r_000002_0: at
> > org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:444)
> > attempt_201303072200_0016_r_000002_0: at
> > org.apache.hadoop.mapred.Child$4.run(Child.java:268)
> > attempt_201303072200_0016_r_000002_0: at
> > java.security.AccessController.doPrivileged(Native Method)
> > attempt_201303072200_0016_r_000002_0: at
> > javax.security.auth.Subject.doAs(Subject.java:396)
> > attempt_201303072200_0016_r_000002_0: at
> > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
> > attempt_201303072200_0016_r_000002_0: at
> > org.apache.hadoop.mapred.Child.main(Child.java:262)
> >
> > But how to solve this?
> >
> >
> > 2013/3/7 Christian Schneider <cschneiderpublic@gmail.com>
> >>
> >> Hi,
> >> during the Reduce phase or afterwards (i don't really know how to debug
> >> it) I get a heap out of Memory Exception.
> >>
> >> I guess this is because the value of the reduce task (a Custom Writable)
> >> holds a List with a lot of user ids.
> >> The Setup is quite simple. This are the related classes I used:
> >>
> >> //-----------------------------------------------
> >> // The Reducer
> >> // It just add all userIds of the Iterable to the UserSetWriteAble
> >> //-----------------------------------------------
> >> public class UserToAppReducer extends Reducer<Text, Text, Text,
> >> UserSetWritable> {
> >>
> >> @Override
> >> protected void reduce(final Text appId, final Iterable<Text> userIds,
> >> final Context context) throws IOException, InterruptedException  {
> >> final UserSetWritable userSet = new UserSetWritable();
> >>
> >> final Iterator<Text> iterator = userIds.iterator();
> >> while (iterator.hasNext()) {
> >> userSet.add(iterator.next().toString());
> >> }
> >>
> >> context.write(appId, userSet);
> >> }
> >> }
> >>
> >> //-----------------------------------------------
> >> // The Custom Writable
> >> // Needed to implement a own toString Method bring the output into the
> >> right format. Maybe i can to this also with a own OutputFormat class.
> >> //-----------------------------------------------
> >> public class UserSetWritable implements Writable {
> >> private final Set<String> userIds = new HashSet<String>();
> >>
> >> public void add(final String userId) {
> >> this.userIds.add(userId);
> >> }
> >>
> >> @Override
> >> public void write(final DataOutput out) throws IOException {
> >> out.writeInt(this.userIds.size());
> >> for (final String userId : this.userIds) {
> >> out.writeUTF(userId);
> >> }
> >> }
> >>
> >> @Override
> >> public void readFields(final DataInput in) throws IOException {
> >> final int size = in.readInt();
> >> for (int i = 0; i < size; i++) {
> >> final String readUTF = in.readUTF();
> >> this.userIds.add(readUTF);
> >> }
> >> }
> >>
> >> @Override
> >> public String toString() {
> >> String result = "";
> >> for (final String userId : this.userIds) {
> >> result += userId + "\t";
> >> }
> >>
> >> result += this.userIds.size();
> >> return result;
> >> }
> >> }
> >>
> >> As Outputformat I used the default TextOutputFormat.
> >>
> >> A potential problem could be, that a reduce is going to write files >600MB
> >> and our mapred.child.java.opts is set to ~380MB.
> >> I digged deeper into the TextOutputFormat and saw, that the
> >> HdfsDataOutputStream is not implementing .flush().
> >> And .flush is also not used in TextOutputFormat. This means, that the
> >> whole file is kept in RAM and then persisted at the end of processing, or?
> >> And of course, this leads into the exception.
> >>
> >> With PIG I am able to query the same Data. Even with one reducer only.
> >> But I have a bet to make it faster with plain MapReduce :)
> >>
> >> Could you help me how to debug this and maybe point me into the right
> >> direction?
> >>
> >> Best Regards,
> >> Christian.
> >
> >
> 
> 
> 
> --
> Harsh J


Mime
View raw message