incubator-crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gabriel Reid <gabriel.r...@gmail.com>
Subject Re: Facing error java.io.NotSerializableException: org.apache.hadoop.io.Text
Date Wed, 26 Dec 2012 17:14:40 GMT
Hi Ashish,

Your solution looks good -- indeed, any non-serializable members are typically initialized
in the initialize method. 

The way crunch works is that DoFn instances are serialized at the client, and then deserialized,
initialized, and run within map and reduce tasks. A single map or reduce task will make use
of one or more DoFn instances (ie they can be chained together within a single task). 

- Gabriel

On 26 Dec 2012, at 15:26, Ashish <paliwalashish@gmail.com> wrote:

> Hi Gabriel,
> 
> Bull's eye :) My code was holding reference to a non-transient Text instance.
> 
> Here is the culprit code
> 
> PTable<TextPair, Long> wordCoOccurrence = textFile.parallelDo(new DoFn<String,
Pair<TextPair, Long>>() {
>             TextPair textPair = new TextPair();
>             @Override
>             public void process(String input, Emitter<Pair<TextPair, Long>>
emitter) {
> 
>                 String[] words =  input.split("\\s+");
> 
>                 for (int i = 0; i < words.length; i++) {
>                     String word = words[i];
>                     if(Strings.isNullOrEmpty(word)) {
>                         continue;
>                     }
> 
>                     // lets look for neighbours now
>                     int start = (i - DEFAULT_NEIGHBOUR_WINDOW < 0) ? 0 : i - DEFAULT_NEIGHBOUR_WINDOW;
>                     int end = (i + DEFAULT_NEIGHBOUR_WINDOW >= words.length) ? words.length
- 1 : i + DEFAULT_NEIGHBOUR_WINDOW;
>                     for(int j = start; j < end; j++) {
>                         if(i == j) continue;
>                         textPair.set(new Text(words[i]), new Text(words[j]));
>                         emitter.emit(Pair.of(textPair, 1L));
>                     }
>                 }
>             }
>         }, textFile.getTypeFamily().tableOf(Writables.writables(TextPair.class), Writables.longs()));
> 
> And this is how I fixed it
> 
> PTable<TextPair, Long> wordCoOccurrence = textFile.parallelDo(new DoFn<String,
Pair<TextPair, Long>>() {
>             transient TextPair textPair;
> 
>             @Override
>             public void initialize() {
>                 super.initialize();
>                 textPair = new TextPair();
>             }
> 
>             @Override
>             public void process(String input, Emitter<Pair<TextPair, Long>>
emitter) {
>                 String[] words =  input.split("\\s+");
> 
>                 for (int i = 0; i < words.length; i++) {
>                     String word = words[i];
>                     if(Strings.isNullOrEmpty(word)) {
>                         continue;
>                     }
> 
>                     // lets look for neighbours now
>                     int start = (i - DEFAULT_NEIGHBOUR_WINDOW < 0) ? 0 : i - DEFAULT_NEIGHBOUR_WINDOW;
>                     int end = (i + DEFAULT_NEIGHBOUR_WINDOW >= words.length) ? words.length
- 1 : i + DEFAULT_NEIGHBOUR_WINDOW;
>                     for(int j = start; j < end; j++) {
>                         if(i == j) continue;
>                         textPair.set(new Text(words[i]), new Text(words[j]));
>                         emitter.emit(Pair.of(textPair, 1L));
>                     }
>                 }
>             }
>         }, textFile.getTypeFamily().tableOf(Writables.writables(TextPair.class), Writables.longs()));
> 
> Would you please share how this part is converted to Hadoop Map function? Does crunch
convert these function to normal MapReduce jobs or the process is more involved? I have to
admit I coded this like I used to code Mapper functions.
> 
> Appreciate your help.
> 
> 
> On Wed, Dec 26, 2012 at 7:04 PM, Gabriel Reid <gabriel.reid@gmail.com> wrote:
>> Hi Ashish,
>> 
>> Are you holding on to a non-transient Text instance in a DoFn perhaps? DoFns need
to remain serializable. 
>> 
>> Otherwise, could you post your (non-working) code (I'm assuming its pretty short).

>> 
>> - Gabriel
>> 
>> 
>> On 26 Dec 2012, at 13:54, Ashish <paliwalashish@gmail.com> wrote:
>> 
>>> Folks,
>>> 
>>> I was trying to port Word co-occurrence example(using Pairs) to Crunch. Had used
famous TextPair class from Hadoop Definitive Guide.
>>> While running getting this error
>>> 
>>> ERROR mr.MRPipeline: org.apache.crunch.impl.mr.run.CrunchRuntimeException: java.io.NotSerializableException:
org.apache.hadoop.io.Text
>>> 
>>> As an alternative, I created WordPair class that uses String instead of Text
and implemented Serializable, WritableComparable. This piece worked.
>>> 
>>> Is this behavior expected or I am missing something?
>>> 
>>> 
>>> -- 
>>> thanks
>>> ashish
>>> 
>>> Blog: http://www.ashishpaliwal.com/blog
>>> My Photo Galleries: http://www.pbase.com/ashishpaliwal
> 
> 
> 
> -- 
> thanks
> ashish
> 
> Blog: http://www.ashishpaliwal.com/blog
> My Photo Galleries: http://www.pbase.com/ashishpaliwal

Mime
View raw message