crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <jwi...@cloudera.com>
Subject Re: Facing error java.io.NotSerializableException: org.apache.hadoop.io.Text
Date Tue, 12 Feb 2013 23:21:36 GMT
Closures can be a little tricky to serialize, b/c it's not clear what of
their surrounding state they want to serialize as part of themselves...I
don't know enough Groovy to know how best to handle that.


On Tue, Feb 12, 2013 at 3:01 PM, Mike Barretta <mike.barretta@gmail.com>wrote:

> Similar to the OP, getting this:
> ERROR mr.MRPipeline: org.apache.crunch.impl.mr.run.CrunchRuntimeException:
> java.io.NotSerializableException: org.apache.crunch.impl.mr.MRPipeline
>
> ...strange
>
> I do have a non-serializable component to assemble my objects from Thrift
> objects, but ended up extending it in to an object that is
> - WrappedAssembler in this case.  function() is a passed in Groovy closure
> which does the actual emitting.
>
> Groovy code is:
>
>         def results = crunchPipeline.read(source).parallelDo(
> this.class.name + ":" + table, new DoFn<Pair<ColumnKey,
> ColumnDataArrayWritable>, String>() {
>
>             @Override
>             void process(Pair<ColumnKey, ColumnDataArrayWritable> input,
> Emitter<String> emitter) {
>                 input.second().toArray().each {
>                     def obj = new
> WrappedAssembler().assemble([PetalUtils.toThrift(input.first(), it)])
>                     function(obj, emitter)
>                 }
>             }
>         }, Writables.strings())
>
>
> On Thu, Dec 27, 2012 at 1:20 PM, Gabriel Reid <gabriel.reid@gmail.com>wrote:
>
>> Hi Ashish,
>>
>> The full implementation of this kind of thing (the translation from
>> pipeline code into MapReduce jobs) is contained in the packages under
>> org.apache.crunch.impl.mr.
>>
>> Crunch has a single Mapper class (o.a.c.impl.mr.run.CrunchMapper) and
>> a single Reducer class (o.a.c.impl.mr.run.CrunchReducer). All DoFns
>> are executed within this Mapper and Reducer (plus CrunchCombiner), so
>> they are effectively wrappers around the execution of the DoFns (and I
>> would say that your understanding is indeed correct).
>>
>> - Gabriel
>>
>> On Thu, Dec 27, 2012 at 3:36 AM, Ashish <paliwalashish@gmail.com> wrote:
>> > Thanks Gabriel !
>> >
>> > Where can I look in Crunch code for this. Its like Crunch has some
>> wrapper
>> > MapReduce functions and post the complete pipeline graph, it decides
>> which
>> > functions to run when, passing the params from Mapper to DoFn instance.
>> Is
>> > this understanding correct?
>> >
>> >
>> > On Wed, Dec 26, 2012 at 10:44 PM, Gabriel Reid <gabriel.reid@gmail.com>
>> > wrote:
>> >>
>> >> Hi Ashish,
>> >>
>> >> Your solution looks good -- indeed, any non-serializable members are
>> >> typically initialized in the initialize method.
>> >>
>> >> The way crunch works is that DoFn instances are serialized at the
>> client,
>> >> and then deserialized, initialized, and run within map and reduce
>> tasks. A
>> >> single map or reduce task will make use of one or more DoFn instances
>> (ie
>> >> they can be chained together within a single task).
>> >>
>> >> - Gabriel
>> >>
>> >>
>> >> On 26 Dec 2012, at 15:26, Ashish <paliwalashish@gmail.com> wrote:
>> >>
>> >> Hi Gabriel,
>> >>
>> >> Bull's eye :) My code was holding reference to a non-transient Text
>> >> instance.
>> >>
>> >> Here is the culprit code
>> >>
>> >> PTable<TextPair, Long> wordCoOccurrence = textFile.parallelDo(new
>> >> DoFn<String, Pair<TextPair, Long>>() {
>> >>             TextPair textPair = new TextPair();
>> >>             @Override
>> >>             public void process(String input, Emitter<Pair<TextPair,
>> >> Long>> emitter) {
>> >>
>> >>                 String[] words =  input.split("\\s+");
>> >>
>> >>                 for (int i = 0; i < words.length; i++) {
>> >>                     String word = words[i];
>> >>                     if(Strings.isNullOrEmpty(word)) {
>> >>                         continue;
>> >>                     }
>> >>
>> >>                     // lets look for neighbours now
>> >>                     int start = (i - DEFAULT_NEIGHBOUR_WINDOW < 0) ?
0
>> : i
>> >> - DEFAULT_NEIGHBOUR_WINDOW;
>> >>                     int end = (i + DEFAULT_NEIGHBOUR_WINDOW >=
>> >> words.length) ? words.length - 1 : i + DEFAULT_NEIGHBOUR_WINDOW;
>> >>                     for(int j = start; j < end; j++) {
>> >>                         if(i == j) continue;
>> >>                         textPair.set(new Text(words[i]), new
>> >> Text(words[j]));
>> >>                         emitter.emit(Pair.of(textPair, 1L));
>> >>                     }
>> >>                 }
>> >>             }
>> >>         },
>> >> textFile.getTypeFamily().tableOf(Writables.writables(TextPair.class),
>> >> Writables.longs()));
>> >>
>> >> And this is how I fixed it
>> >>
>> >> PTable<TextPair, Long> wordCoOccurrence = textFile.parallelDo(new
>> >> DoFn<String, Pair<TextPair, Long>>() {
>> >>             transient TextPair textPair;
>> >>
>> >>             @Override
>> >>             public void initialize() {
>> >>                 super.initialize();
>> >>                 textPair = new TextPair();
>> >>             }
>> >>
>> >>             @Override
>> >>             public void process(String input, Emitter<Pair<TextPair,
>> >> Long>> emitter) {
>> >>                 String[] words =  input.split("\\s+");
>> >>
>> >>                 for (int i = 0; i < words.length; i++) {
>> >>                     String word = words[i];
>> >>                     if(Strings.isNullOrEmpty(word)) {
>> >>                         continue;
>> >>                     }
>> >>
>> >>                     // lets look for neighbours now
>> >>                     int start = (i - DEFAULT_NEIGHBOUR_WINDOW < 0) ?
0
>> : i
>> >> - DEFAULT_NEIGHBOUR_WINDOW;
>> >>                     int end = (i + DEFAULT_NEIGHBOUR_WINDOW >=
>> >> words.length) ? words.length - 1 : i + DEFAULT_NEIGHBOUR_WINDOW;
>> >>                     for(int j = start; j < end; j++) {
>> >>                         if(i == j) continue;
>> >>                         textPair.set(new Text(words[i]), new
>> >> Text(words[j]));
>> >>                         emitter.emit(Pair.of(textPair, 1L));
>> >>                     }
>> >>                 }
>> >>             }
>> >>         },
>> >> textFile.getTypeFamily().tableOf(Writables.writables(TextPair.class),
>> >> Writables.longs()));
>> >>
>> >> Would you please share how this part is converted to Hadoop Map
>> function?
>> >> Does crunch convert these function to normal MapReduce jobs or the
>> process
>> >> is more involved? I have to admit I coded this like I used to code
>> Mapper
>> >> functions.
>> >>
>> >> Appreciate your help.
>> >>
>> >>
>> >> On Wed, Dec 26, 2012 at 7:04 PM, Gabriel Reid <gabriel.reid@gmail.com>
>> >> wrote:
>> >>>
>> >>> Hi Ashish,
>> >>>
>> >>> Are you holding on to a non-transient Text instance in a DoFn perhaps?
>> >>> DoFns need to remain serializable.
>> >>>
>> >>> Otherwise, could you post your (non-working) code (I'm assuming its
>> >>> pretty short).
>> >>>
>> >>> - Gabriel
>> >>>
>> >>>
>> >>> On 26 Dec 2012, at 13:54, Ashish <paliwalashish@gmail.com> wrote:
>> >>>
>> >>> Folks,
>> >>>
>> >>> I was trying to port Word co-occurrence example(using Pairs) to
>> Crunch.
>> >>> Had used famous TextPair class from Hadoop Definitive Guide.
>> >>> While running getting this error
>> >>>
>> >>> ERROR mr.MRPipeline:
>> >>> org.apache.crunch.impl.mr.run.CrunchRuntimeException:
>> >>> java.io.NotSerializableException: org.apache.hadoop.io.Text
>> >>>
>> >>> As an alternative, I created WordPair class that uses String instead
>> of
>> >>> Text and implemented Serializable, WritableComparable. This piece
>> worked.
>> >>>
>> >>> Is this behavior expected or I am missing something?
>> >>>
>> >>>
>> >>> --
>> >>> thanks
>> >>> ashish
>> >>>
>> >>> Blog: http://www.ashishpaliwal.com/blog
>> >>> My Photo Galleries: http://www.pbase.com/ashishpaliwal
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> thanks
>> >> ashish
>> >>
>> >> Blog: http://www.ashishpaliwal.com/blog
>> >> My Photo Galleries: http://www.pbase.com/ashishpaliwal
>> >
>> >
>> >
>> >
>> > --
>> > thanks
>> > ashish
>> >
>> > Blog: http://www.ashishpaliwal.com/blog
>> > My Photo Galleries: http://www.pbase.com/ashishpaliwal
>>
>
>


-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Mime
View raw message