Return-Path: X-Original-To: apmail-incubator-crunch-user-archive@minotaur.apache.org Delivered-To: apmail-incubator-crunch-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 67D3CD3B1 for ; Wed, 26 Dec 2012 17:15:21 +0000 (UTC) Received: (qmail 47051 invoked by uid 500); 26 Dec 2012 17:15:21 -0000 Delivered-To: apmail-incubator-crunch-user-archive@incubator.apache.org Received: (qmail 47024 invoked by uid 500); 26 Dec 2012 17:15:21 -0000 Mailing-List: contact crunch-user-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: crunch-user@incubator.apache.org Delivered-To: mailing list crunch-user@incubator.apache.org Received: (qmail 47010 invoked by uid 99); 26 Dec 2012 17:15:21 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Dec 2012 17:15:21 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,MIME_QP_LONG_LINE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of gabriel.reid@gmail.com designates 209.85.212.169 as permitted sender) Received: from [209.85.212.169] (HELO mail-wi0-f169.google.com) (209.85.212.169) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Dec 2012 17:15:13 +0000 Received: by mail-wi0-f169.google.com with SMTP id hq12so4141309wib.2 for ; Wed, 26 Dec 2012 09:14:53 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=x-received:subject:references:from:content-type:x-mailer :in-reply-to:message-id:date:to:content-transfer-encoding :mime-version; bh=MdpSitddgrawDTpqwh3dt004Oz16Glse7TQgrUyaROk=; b=Z6KAbYoNCUv243uh52IvnxZx84xKvFsrd+RE1nwkUfqr2m5tUbrEHc1P8og3WJQuVz H4dosn5BRBVI3tISEqRwEyZYvDcOWQStRS2C/IgyUDGte7lhKZzZjzZ3SdeUz6D5DZwj 3D+V9IcwuIK6ddUZTw7H4hgrbDRQXqqWAUcTuuDisrdAIdOJN/hqoCNUkV3eNgNLlZBB pOFDjFdzRh9YKaCPy69VVuNkZw9CtZ1bj+Kh7SVNy096bQ3Vw7D1wXj3AkvirbmxPuM8 c0c3hMcOkpSq3B8ejEjwK1+R157152ovSZMBQHeTejJ8laY8Cqt3LxoFyZWSulVNskYV wNbg== X-Received: by 10.194.58.175 with SMTP id s15mr45056549wjq.31.1356542093214; Wed, 26 Dec 2012 09:14:53 -0800 (PST) Received: from [10.42.172.104] (ptra-178-50-76-104.mobistar.be. [178.50.76.104]) by mx.google.com with ESMTPS id bw9sm26932650wib.5.2012.12.26.09.14.50 (version=SSLv3 cipher=OTHER); Wed, 26 Dec 2012 09:14:51 -0800 (PST) Subject: Re: Facing error java.io.NotSerializableException: org.apache.hadoop.io.Text References: <8117762C-D925-403B-A6AE-ECDA80C9FD56@gmail.com> From: Gabriel Reid Content-Type: multipart/alternative; boundary=Apple-Mail-DFBC55C3-2C88-437E-93EB-07DC19740BE1 X-Mailer: iPhone Mail (10A523) In-Reply-To: Message-Id: <8304F869-BA13-40AC-880B-9BCCBF0451D9@gmail.com> Date: Wed, 26 Dec 2012 18:14:40 +0100 To: "crunch-user@incubator.apache.org" Content-Transfer-Encoding: 7bit Mime-Version: 1.0 (1.0) X-Virus-Checked: Checked by ClamAV on apache.org --Apple-Mail-DFBC55C3-2C88-437E-93EB-07DC19740BE1 Content-Type: text/plain; charset=us-ascii Content-Transfer-Encoding: quoted-printable Hi Ashish, Your solution looks good -- indeed, any non-serializable members are typical= ly initialized in the initialize method.=20 The way crunch works is that DoFn instances are serialized at the client, an= d then deserialized, initialized, and run within map and reduce tasks. A sin= gle map or reduce task will make use of one or more DoFn instances (ie they c= an be chained together within a single task).=20 - Gabriel On 26 Dec 2012, at 15:26, Ashish wrote: > Hi Gabriel, >=20 > Bull's eye :) My code was holding reference to a non-transient Text instan= ce. >=20 > Here is the culprit code >=20 > PTable wordCoOccurrence =3D textFile.parallelDo(new DoFn>() { > TextPair textPair =3D new TextPair(); > @Override > public void process(String input, Emitter= > emitter) { >=20 > String[] words =3D input.split("\\s+"); >=20 > for (int i =3D 0; i < words.length; i++) { > String word =3D words[i]; > if(Strings.isNullOrEmpty(word)) { > continue; > } >=20 > // lets look for neighbours now > int start =3D (i - DEFAULT_NEIGHBOUR_WINDOW < 0) ? 0 := i - DEFAULT_NEIGHBOUR_WINDOW; > int end =3D (i + DEFAULT_NEIGHBOUR_WINDOW >=3D words.l= ength) ? words.length - 1 : i + DEFAULT_NEIGHBOUR_WINDOW; > for(int j =3D start; j < end; j++) { > if(i =3D=3D j) continue; > textPair.set(new Text(words[i]), new Text(words[j]= )); > emitter.emit(Pair.of(textPair, 1L)); > } > } > } > }, textFile.getTypeFamily().tableOf(Writables.writables(TextPair.c= lass), Writables.longs())); >=20 > And this is how I fixed it >=20 > PTable wordCoOccurrence =3D textFile.parallelDo(new DoFn>() { > transient TextPair textPair; >=20 > @Override > public void initialize() { > super.initialize(); > textPair =3D new TextPair(); > } >=20 > @Override > public void process(String input, Emitter= > emitter) { > String[] words =3D input.split("\\s+"); >=20 > for (int i =3D 0; i < words.length; i++) { > String word =3D words[i]; > if(Strings.isNullOrEmpty(word)) { > continue; > } >=20 > // lets look for neighbours now > int start =3D (i - DEFAULT_NEIGHBOUR_WINDOW < 0) ? 0 := i - DEFAULT_NEIGHBOUR_WINDOW; > int end =3D (i + DEFAULT_NEIGHBOUR_WINDOW >=3D words.l= ength) ? words.length - 1 : i + DEFAULT_NEIGHBOUR_WINDOW; > for(int j =3D start; j < end; j++) { > if(i =3D=3D j) continue; > textPair.set(new Text(words[i]), new Text(words[j]= )); > emitter.emit(Pair.of(textPair, 1L)); > } > } > } > }, textFile.getTypeFamily().tableOf(Writables.writables(TextPair.c= lass), Writables.longs())); >=20 > Would you please share how this part is converted to Hadoop Map function? D= oes crunch convert these function to normal MapReduce jobs or the process is= more involved? I have to admit I coded this like I used to code Mapper func= tions. >=20 > Appreciate your help. >=20 >=20 > On Wed, Dec 26, 2012 at 7:04 PM, Gabriel Reid wro= te: >> Hi Ashish, >>=20 >> Are you holding on to a non-transient Text instance in a DoFn perhaps? Do= Fns need to remain serializable.=20 >>=20 >> Otherwise, could you post your (non-working) code (I'm assuming its prett= y short).=20 >>=20 >> - Gabriel >>=20 >>=20 >> On 26 Dec 2012, at 13:54, Ashish wrote: >>=20 >>> Folks, >>>=20 >>> I was trying to port Word co-occurrence example(using Pairs) to Crunch. H= ad used famous TextPair class from Hadoop Definitive Guide. >>> While running getting this error >>>=20 >>> ERROR mr.MRPipeline: org.apache.crunch.impl.mr.run.CrunchRuntimeExceptio= n: java.io.NotSerializableException: org.apache.hadoop.io.Text >>>=20 >>> As an alternative, I created WordPair class that uses String instead of T= ext and implemented Serializable, WritableComparable. This piece worked. >>>=20 >>> Is this behavior expected or I am missing something? >>>=20 >>>=20 >>> --=20 >>> thanks >>> ashish >>>=20 >>> Blog: http://www.ashishpaliwal.com/blog >>> My Photo Galleries: http://www.pbase.com/ashishpaliwal >=20 >=20 >=20 > --=20 > thanks > ashish >=20 > Blog: http://www.ashishpaliwal.com/blog > My Photo Galleries: http://www.pbase.com/ashishpaliwal --Apple-Mail-DFBC55C3-2C88-437E-93EB-07DC19740BE1 Content-Type: text/html; charset=utf-8 Content-Transfer-Encoding: quoted-printable
Hi Ashish,

Yo= ur solution looks good -- indeed, any non-serializable members are typically= initialized in the initialize method. 

The wa= y crunch works is that DoFn instances are serialized at the client, and then= deserialized, initialized, and run within map and reduce tasks. A single ma= p or reduce task will make use of one or more DoFn instances (ie they can be= chained together within a single task). 

- Ga= briel

On 26 Dec 2012, at 15:26, Ashish <paliwalashish@gmail.com> wrote:

Hi Gabriel,

Bull's eye :) My code was holding reference to a non-transient Text i= nstance.

Here is the culprit c= ode

PTable<TextPair, Long> wordCoOccurrence =3D textFi= le.parallelDo(new DoFn<String, Pair<TextPair, Long>>() {
            TextPair textPair =3D new TextP= air();
            @Override
            public void process(String in= put, Emitter<Pair<TextPair, Long>> emitter) {

                String[] word= s =3D  input.split("\\s+");

    &nbs= p;           for (int i =3D 0; i < words.length;= i++) {
                    S= tring word =3D words[i];
           =         if(Strings.isNullOrEmpty(word)) {
&nb= sp;                     &n= bsp; continue;
              &n= bsp;     }

        &n= bsp;           // lets look for neighbours now
                    i= nt start =3D (i - DEFAULT_NEIGHBOUR_WINDOW < 0) ? 0 : i - DEFAULT_NEIGHBO= UR_WINDOW;
               =     int end =3D (i + DEFAULT_NEIGHBOUR_WINDOW >=3D words.lengt= h) ? words.length - 1 : i + DEFAULT_NEIGHBOUR_WINDOW;
                    f= or(int j =3D start; j < end; j++) {
       =                 if(i =3D=3D j) cont= inue;
                &nbs= p;       textPair.set(new Text(words[i]), new Text(words[j]))= ;
                  &= nbsp;     emitter.emit(Pair.of(textPair, 1L));
                    }=
                }
            }
     = ;   }, textFile.getTypeFamily().tableOf(Writables.writables(TextPair.cl= ass), Writables.longs()));

And this is how I fixed it

PTable<TextPair, Long> wordCoOccurrence =3D textFile.parallelDo(new= DoFn<String, Pair<TextPair, Long>>() {
    &= nbsp;       transient TextPair textPair;

            @Override
            public void initialize() {
                super.initi= alize();
                t= extPair =3D new TextPair();
          &nb= sp; }

            @Override
            public void process(String input,= Emitter<Pair<TextPair, Long>> emitter) {
   = ;             String[] words =3D  input.s= plit("\\s+");

                f= or (int i =3D 0; i < words.length; i++) {
      &= nbsp;             String word =3D words[i];
                   = ; if(Strings.isNullOrEmpty(word)) {
        &n= bsp;               continue;
                    }=

              &= nbsp;     // lets look for neighbours now
    &= nbsp;               int start =3D (i - DE= FAULT_NEIGHBOUR_WINDOW < 0) ? 0 : i - DEFAULT_NEIGHBOUR_WINDOW;
                    i= nt end =3D (i + DEFAULT_NEIGHBOUR_WINDOW >=3D words.length) ? words.lengt= h - 1 : i + DEFAULT_NEIGHBOUR_WINDOW;
        &= nbsp;           for(int j =3D start; j < end; j+= +) {
                 = ;       if(i =3D=3D j) continue;
                    &= nbsp;   textPair.set(new Text(words[i]), new Text(words[j]));
                     = ;   emitter.emit(Pair.of(textPair, 1L));
     =               }
    &= nbsp;           }
            }
      &= nbsp; }, textFile.getTypeFamily().tableOf(Writables.writables(TextPair.class= ), Writables.longs()));

Would you please= share how this part is converted to Hadoop Map function? Does crunch conver= t these function to normal MapReduce jobs or the process is more involved? I= have to admit I coded this like I used to code Mapper functions.

Appreciate your help.
=


On Wed, Dec 26= , 2012 at 7:04 PM, Gabriel Reid <gabriel.reid@gmail.com> w= rote:
Hi Ashish,
Are you holding on to a non-transient Text instance in a DoFn p= erhaps? DoFns need to remain serializable. 

Otherwise, could you post your (non-working) code (I'm a= ssuming its pretty short). 

- Gabriel


On 26 Dec 2012, at 13:54, Ashish <paliwalashish@gmail.com&= gt; wrote:

Folks,

I was trying to port Word co= -occurrence example(using Pairs) to Crunch. Had used famous TextPair class f= rom Hadoop Definitive Guide.
While running getting this error

ERROR mr.MRPipeline: org.apache.crunch.impl.mr.run.Crunc= hRuntimeException: java.io.NotSerializableException: org.apache.hadoop.io.Te= xt

As an alternative, I created WordPair class t= hat uses String instead of Text and implemented Serializable, WritableCompar= able. This piece worked.

Is this behavior expected or I am missing some= thing?



--
thanks
ashish

Blog: http://www.ashishpaliwal.com/blog
My Photo Ga= lleries: http://www.pbase.com= /ashishpaliwal
= --Apple-Mail-DFBC55C3-2C88-437E-93EB-07DC19740BE1--