incubator-crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gabriel Reid <gabriel.r...@gmail.com>
Subject Re: Facing error java.io.NotSerializableException: org.apache.hadoop.io.Text
Date Thu, 27 Dec 2012 18:20:41 GMT
Hi Ashish,

The full implementation of this kind of thing (the translation from
pipeline code into MapReduce jobs) is contained in the packages under
org.apache.crunch.impl.mr.

Crunch has a single Mapper class (o.a.c.impl.mr.run.CrunchMapper) and
a single Reducer class (o.a.c.impl.mr.run.CrunchReducer). All DoFns
are executed within this Mapper and Reducer (plus CrunchCombiner), so
they are effectively wrappers around the execution of the DoFns (and I
would say that your understanding is indeed correct).

- Gabriel

On Thu, Dec 27, 2012 at 3:36 AM, Ashish <paliwalashish@gmail.com> wrote:
> Thanks Gabriel !
>
> Where can I look in Crunch code for this. Its like Crunch has some wrapper
> MapReduce functions and post the complete pipeline graph, it decides which
> functions to run when, passing the params from Mapper to DoFn instance. Is
> this understanding correct?
>
>
> On Wed, Dec 26, 2012 at 10:44 PM, Gabriel Reid <gabriel.reid@gmail.com>
> wrote:
>>
>> Hi Ashish,
>>
>> Your solution looks good -- indeed, any non-serializable members are
>> typically initialized in the initialize method.
>>
>> The way crunch works is that DoFn instances are serialized at the client,
>> and then deserialized, initialized, and run within map and reduce tasks. A
>> single map or reduce task will make use of one or more DoFn instances (ie
>> they can be chained together within a single task).
>>
>> - Gabriel
>>
>>
>> On 26 Dec 2012, at 15:26, Ashish <paliwalashish@gmail.com> wrote:
>>
>> Hi Gabriel,
>>
>> Bull's eye :) My code was holding reference to a non-transient Text
>> instance.
>>
>> Here is the culprit code
>>
>> PTable<TextPair, Long> wordCoOccurrence = textFile.parallelDo(new
>> DoFn<String, Pair<TextPair, Long>>() {
>>             TextPair textPair = new TextPair();
>>             @Override
>>             public void process(String input, Emitter<Pair<TextPair,
>> Long>> emitter) {
>>
>>                 String[] words =  input.split("\\s+");
>>
>>                 for (int i = 0; i < words.length; i++) {
>>                     String word = words[i];
>>                     if(Strings.isNullOrEmpty(word)) {
>>                         continue;
>>                     }
>>
>>                     // lets look for neighbours now
>>                     int start = (i - DEFAULT_NEIGHBOUR_WINDOW < 0) ? 0 : i
>> - DEFAULT_NEIGHBOUR_WINDOW;
>>                     int end = (i + DEFAULT_NEIGHBOUR_WINDOW >=
>> words.length) ? words.length - 1 : i + DEFAULT_NEIGHBOUR_WINDOW;
>>                     for(int j = start; j < end; j++) {
>>                         if(i == j) continue;
>>                         textPair.set(new Text(words[i]), new
>> Text(words[j]));
>>                         emitter.emit(Pair.of(textPair, 1L));
>>                     }
>>                 }
>>             }
>>         },
>> textFile.getTypeFamily().tableOf(Writables.writables(TextPair.class),
>> Writables.longs()));
>>
>> And this is how I fixed it
>>
>> PTable<TextPair, Long> wordCoOccurrence = textFile.parallelDo(new
>> DoFn<String, Pair<TextPair, Long>>() {
>>             transient TextPair textPair;
>>
>>             @Override
>>             public void initialize() {
>>                 super.initialize();
>>                 textPair = new TextPair();
>>             }
>>
>>             @Override
>>             public void process(String input, Emitter<Pair<TextPair,
>> Long>> emitter) {
>>                 String[] words =  input.split("\\s+");
>>
>>                 for (int i = 0; i < words.length; i++) {
>>                     String word = words[i];
>>                     if(Strings.isNullOrEmpty(word)) {
>>                         continue;
>>                     }
>>
>>                     // lets look for neighbours now
>>                     int start = (i - DEFAULT_NEIGHBOUR_WINDOW < 0) ? 0 : i
>> - DEFAULT_NEIGHBOUR_WINDOW;
>>                     int end = (i + DEFAULT_NEIGHBOUR_WINDOW >=
>> words.length) ? words.length - 1 : i + DEFAULT_NEIGHBOUR_WINDOW;
>>                     for(int j = start; j < end; j++) {
>>                         if(i == j) continue;
>>                         textPair.set(new Text(words[i]), new
>> Text(words[j]));
>>                         emitter.emit(Pair.of(textPair, 1L));
>>                     }
>>                 }
>>             }
>>         },
>> textFile.getTypeFamily().tableOf(Writables.writables(TextPair.class),
>> Writables.longs()));
>>
>> Would you please share how this part is converted to Hadoop Map function?
>> Does crunch convert these function to normal MapReduce jobs or the process
>> is more involved? I have to admit I coded this like I used to code Mapper
>> functions.
>>
>> Appreciate your help.
>>
>>
>> On Wed, Dec 26, 2012 at 7:04 PM, Gabriel Reid <gabriel.reid@gmail.com>
>> wrote:
>>>
>>> Hi Ashish,
>>>
>>> Are you holding on to a non-transient Text instance in a DoFn perhaps?
>>> DoFns need to remain serializable.
>>>
>>> Otherwise, could you post your (non-working) code (I'm assuming its
>>> pretty short).
>>>
>>> - Gabriel
>>>
>>>
>>> On 26 Dec 2012, at 13:54, Ashish <paliwalashish@gmail.com> wrote:
>>>
>>> Folks,
>>>
>>> I was trying to port Word co-occurrence example(using Pairs) to Crunch.
>>> Had used famous TextPair class from Hadoop Definitive Guide.
>>> While running getting this error
>>>
>>> ERROR mr.MRPipeline:
>>> org.apache.crunch.impl.mr.run.CrunchRuntimeException:
>>> java.io.NotSerializableException: org.apache.hadoop.io.Text
>>>
>>> As an alternative, I created WordPair class that uses String instead of
>>> Text and implemented Serializable, WritableComparable. This piece worked.
>>>
>>> Is this behavior expected or I am missing something?
>>>
>>>
>>> --
>>> thanks
>>> ashish
>>>
>>> Blog: http://www.ashishpaliwal.com/blog
>>> My Photo Galleries: http://www.pbase.com/ashishpaliwal
>>
>>
>>
>>
>> --
>> thanks
>> ashish
>>
>> Blog: http://www.ashishpaliwal.com/blog
>> My Photo Galleries: http://www.pbase.com/ashishpaliwal
>
>
>
>
> --
> thanks
> ashish
>
> Blog: http://www.ashishpaliwal.com/blog
> My Photo Galleries: http://www.pbase.com/ashishpaliwal

Mime
View raw message