crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mike Barretta <mike.barre...@gmail.com>
Subject Re: Facing error java.io.NotSerializableException: org.apache.hadoop.io.Text
Date Tue, 12 Feb 2013 23:01:57 GMT
Similar to the OP, getting this:
ERROR mr.MRPipeline: org.apache.crunch.impl.mr.run.CrunchRuntimeException:
java.io.NotSerializableException: org.apache.crunch.impl.mr.MRPipeline

...strange

I do have a non-serializable component to assemble my objects from Thrift
objects, but ended up extending it in to an object that is
- WrappedAssembler in this case.  function() is a passed in Groovy closure
which does the actual emitting.

Groovy code is:

        def results =
crunchPipeline.read(source).parallelDo(this.class.name+ ":" + table,
new DoFn<Pair<ColumnKey, ColumnDataArrayWritable>, String>()
{

            @Override
            void process(Pair<ColumnKey, ColumnDataArrayWritable> input,
Emitter<String> emitter) {
                input.second().toArray().each {
                    def obj = new
WrappedAssembler().assemble([PetalUtils.toThrift(input.first(), it)])
                    function(obj, emitter)
                }
            }
        }, Writables.strings())


On Thu, Dec 27, 2012 at 1:20 PM, Gabriel Reid <gabriel.reid@gmail.com>wrote:

> Hi Ashish,
>
> The full implementation of this kind of thing (the translation from
> pipeline code into MapReduce jobs) is contained in the packages under
> org.apache.crunch.impl.mr.
>
> Crunch has a single Mapper class (o.a.c.impl.mr.run.CrunchMapper) and
> a single Reducer class (o.a.c.impl.mr.run.CrunchReducer). All DoFns
> are executed within this Mapper and Reducer (plus CrunchCombiner), so
> they are effectively wrappers around the execution of the DoFns (and I
> would say that your understanding is indeed correct).
>
> - Gabriel
>
> On Thu, Dec 27, 2012 at 3:36 AM, Ashish <paliwalashish@gmail.com> wrote:
> > Thanks Gabriel !
> >
> > Where can I look in Crunch code for this. Its like Crunch has some
> wrapper
> > MapReduce functions and post the complete pipeline graph, it decides
> which
> > functions to run when, passing the params from Mapper to DoFn instance.
> Is
> > this understanding correct?
> >
> >
> > On Wed, Dec 26, 2012 at 10:44 PM, Gabriel Reid <gabriel.reid@gmail.com>
> > wrote:
> >>
> >> Hi Ashish,
> >>
> >> Your solution looks good -- indeed, any non-serializable members are
> >> typically initialized in the initialize method.
> >>
> >> The way crunch works is that DoFn instances are serialized at the
> client,
> >> and then deserialized, initialized, and run within map and reduce
> tasks. A
> >> single map or reduce task will make use of one or more DoFn instances
> (ie
> >> they can be chained together within a single task).
> >>
> >> - Gabriel
> >>
> >>
> >> On 26 Dec 2012, at 15:26, Ashish <paliwalashish@gmail.com> wrote:
> >>
> >> Hi Gabriel,
> >>
> >> Bull's eye :) My code was holding reference to a non-transient Text
> >> instance.
> >>
> >> Here is the culprit code
> >>
> >> PTable<TextPair, Long> wordCoOccurrence = textFile.parallelDo(new
> >> DoFn<String, Pair<TextPair, Long>>() {
> >>             TextPair textPair = new TextPair();
> >>             @Override
> >>             public void process(String input, Emitter<Pair<TextPair,
> >> Long>> emitter) {
> >>
> >>                 String[] words =  input.split("\\s+");
> >>
> >>                 for (int i = 0; i < words.length; i++) {
> >>                     String word = words[i];
> >>                     if(Strings.isNullOrEmpty(word)) {
> >>                         continue;
> >>                     }
> >>
> >>                     // lets look for neighbours now
> >>                     int start = (i - DEFAULT_NEIGHBOUR_WINDOW < 0) ? 0
> : i
> >> - DEFAULT_NEIGHBOUR_WINDOW;
> >>                     int end = (i + DEFAULT_NEIGHBOUR_WINDOW >=
> >> words.length) ? words.length - 1 : i + DEFAULT_NEIGHBOUR_WINDOW;
> >>                     for(int j = start; j < end; j++) {
> >>                         if(i == j) continue;
> >>                         textPair.set(new Text(words[i]), new
> >> Text(words[j]));
> >>                         emitter.emit(Pair.of(textPair, 1L));
> >>                     }
> >>                 }
> >>             }
> >>         },
> >> textFile.getTypeFamily().tableOf(Writables.writables(TextPair.class),
> >> Writables.longs()));
> >>
> >> And this is how I fixed it
> >>
> >> PTable<TextPair, Long> wordCoOccurrence = textFile.parallelDo(new
> >> DoFn<String, Pair<TextPair, Long>>() {
> >>             transient TextPair textPair;
> >>
> >>             @Override
> >>             public void initialize() {
> >>                 super.initialize();
> >>                 textPair = new TextPair();
> >>             }
> >>
> >>             @Override
> >>             public void process(String input, Emitter<Pair<TextPair,
> >> Long>> emitter) {
> >>                 String[] words =  input.split("\\s+");
> >>
> >>                 for (int i = 0; i < words.length; i++) {
> >>                     String word = words[i];
> >>                     if(Strings.isNullOrEmpty(word)) {
> >>                         continue;
> >>                     }
> >>
> >>                     // lets look for neighbours now
> >>                     int start = (i - DEFAULT_NEIGHBOUR_WINDOW < 0) ? 0
> : i
> >> - DEFAULT_NEIGHBOUR_WINDOW;
> >>                     int end = (i + DEFAULT_NEIGHBOUR_WINDOW >=
> >> words.length) ? words.length - 1 : i + DEFAULT_NEIGHBOUR_WINDOW;
> >>                     for(int j = start; j < end; j++) {
> >>                         if(i == j) continue;
> >>                         textPair.set(new Text(words[i]), new
> >> Text(words[j]));
> >>                         emitter.emit(Pair.of(textPair, 1L));
> >>                     }
> >>                 }
> >>             }
> >>         },
> >> textFile.getTypeFamily().tableOf(Writables.writables(TextPair.class),
> >> Writables.longs()));
> >>
> >> Would you please share how this part is converted to Hadoop Map
> function?
> >> Does crunch convert these function to normal MapReduce jobs or the
> process
> >> is more involved? I have to admit I coded this like I used to code
> Mapper
> >> functions.
> >>
> >> Appreciate your help.
> >>
> >>
> >> On Wed, Dec 26, 2012 at 7:04 PM, Gabriel Reid <gabriel.reid@gmail.com>
> >> wrote:
> >>>
> >>> Hi Ashish,
> >>>
> >>> Are you holding on to a non-transient Text instance in a DoFn perhaps?
> >>> DoFns need to remain serializable.
> >>>
> >>> Otherwise, could you post your (non-working) code (I'm assuming its
> >>> pretty short).
> >>>
> >>> - Gabriel
> >>>
> >>>
> >>> On 26 Dec 2012, at 13:54, Ashish <paliwalashish@gmail.com> wrote:
> >>>
> >>> Folks,
> >>>
> >>> I was trying to port Word co-occurrence example(using Pairs) to Crunch.
> >>> Had used famous TextPair class from Hadoop Definitive Guide.
> >>> While running getting this error
> >>>
> >>> ERROR mr.MRPipeline:
> >>> org.apache.crunch.impl.mr.run.CrunchRuntimeException:
> >>> java.io.NotSerializableException: org.apache.hadoop.io.Text
> >>>
> >>> As an alternative, I created WordPair class that uses String instead of
> >>> Text and implemented Serializable, WritableComparable. This piece
> worked.
> >>>
> >>> Is this behavior expected or I am missing something?
> >>>
> >>>
> >>> --
> >>> thanks
> >>> ashish
> >>>
> >>> Blog: http://www.ashishpaliwal.com/blog
> >>> My Photo Galleries: http://www.pbase.com/ashishpaliwal
> >>
> >>
> >>
> >>
> >> --
> >> thanks
> >> ashish
> >>
> >> Blog: http://www.ashishpaliwal.com/blog
> >> My Photo Galleries: http://www.pbase.com/ashishpaliwal
> >
> >
> >
> >
> > --
> > thanks
> > ashish
> >
> > Blog: http://www.ashishpaliwal.com/blog
> > My Photo Galleries: http://www.pbase.com/ashishpaliwal
>

Mime
View raw message