hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Harsh J <ha...@cloudera.com>
Subject Re: Map Reduce Theory Question, getting OutOfMemoryError while reducing
Date Fri, 29 Jun 2012 13:52:06 GMT
Hey Matt,

As far as I can tell, Hadoop isn't at fault here truly.

If your issue is that you collect in a list before you store, you
should focus on that and just avoid collecting it completely. Why
don't you serialize as you receive, if the incoming order is already
taken care of? As far as I can tell, your AggregateRecords probably
does nothing else but serialize the stored LinkedList. So instead of
using a LinkedList, or even a composed Writable such as
AggregateRecords, just write them in as you receive them via each
.next(). Would this not work for you? You may batch a constant bit to
gain some write performance but at least you won't have to use up your
memory.

You can serialize as you receive by following this:
http://wiki.apache.org/hadoop/FAQ#Can_I_write_create.2BAC8-write-to_hdfs_files_directly_from_map.2BAC8-reduce_tasks.3F

On Fri, Jun 29, 2012 at 3:07 AM, Berry, Matt <mwberry@amazon.com> wrote:
> I have a MapReduce job that reads in several gigs of log files and separates the records
based on who generated them. My MapReduce job looks like this:
>
> InputFormat: NLineInputFormat
> - Reads N lines of an input file, which is an array of URLs for log files to download
> Mapper: <LongWritable, Text, WhoKey, LogRecord>
> - (LongWritable,Text) is coming from the input format. The Text is parsed into an array
of URLs. Each log is downloaded and the records extracted
> - WhoKey is just an multipart Key that describes who caused a record to be logged
> - LogRecord is the record that they logged, with all irrelevant information purged
> Reducer: <WhoKey, LogRecord, WhoKey, AggregateRecords>
> - The Reducer iterates through the LogRecords for the WhoKey and adds them to a LinkedList
(AggregateRecords) and emits that to the output format
> OutputFormat: <WhoKey, AggregateRecords>
> - Creates a file for each WhoKey and writes the records into it
>
> However I'm running into a problem. When a single person generates an inordinate number
of records, all of them have to be held in memory causing my heap space to run out. I could
increase the heap size, but that will not solve the problem as they could just generate more
records and break it again. I've spent a lot of time thinking about how I could alter my setup
so no more than N number of records are held in memory at a time, but I can't think of a way
to do it.
>
> Is there something seriously wrong with how I am processing this? Should I have structured
the job in a different way that would avoid this scenario? Isn't the MapReduce framework designed
to operate on large data sets, shouldn't it be managing the heap better?
>
> Stderr and Stack Trace:
>
> 12/06/28 12:10:55 INFO mapred.JobClient:  map 100% reduce 67%
> 12/06/28 12:10:58 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:10:58 INFO mapred.JobClient:  map 100% reduce 69%
> 12/06/28 12:11:01 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:11:01 INFO mapred.JobClient:  map 100% reduce 71%
> 12/06/28 12:11:04 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:11:06 INFO mapred.JobClient:  map 100% reduce 72%
> 12/06/28 12:11:07 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:11:11 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:11:15 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:15:31 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:15:35 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:15:41 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:15:46 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:15:51 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:15:56 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:16:01 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:16:06 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:16:12 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:16:22 INFO mapred.LocalJobRunner: reduce > reduce
> 12/06/28 12:16:44 WARN mapred.LocalJobRunner: job_local_0001
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>        at sun.reflect.GeneratedConstructorAccessor14.newInstance(Unknown Source)
>        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
>        at java.lang.reflect.Constructor.newInstance(Constructor.java:513)
>        at java.lang.Class.newInstance0(Class.java:355)
>        at java.lang.Class.newInstance(Class.java:308)
>        at xxxxxx.xxxxxxxxxxx.xxxx.xxxxxxxx.xxxxxxxxxxxxxxxxxx.readFields(xxxxxxxxxxxxxxxxxx.java:41)
>        at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:67)
>        at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:40)
>        at org.apache.hadoop.mapreduce.ReduceContext.nextKeyValue(ReduceContext.java:116)
>        at org.apache.hadoop.mapreduce.ReduceContext$ValueIterator.next(ReduceContext.java:163)
>        at xxxxxx.xxxxxxxxxxx.xxxx.xxxxx.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.reduce(xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.java:34)
>        at xxxxxx.xxxxxxxxxxx.xxxx.xxxxx.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.reduce(xxxxxxxxxxxxxxxxxxxxxxxxxxxxx.java:26)
>        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
>        at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:566)
>        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:408)
>        at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:216)
>
> P.S. Already used jmap to dump the heap and trim each object down to its bare minimum
and to also confirm there are no slow memory leaks.



-- 
Harsh J

Mime
View raw message