Return-Path: X-Original-To: apmail-crunch-user-archive@www.apache.org Delivered-To: apmail-crunch-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7C2291746E for ; Mon, 29 Sep 2014 20:49:48 +0000 (UTC) Received: (qmail 94208 invoked by uid 500); 29 Sep 2014 20:49:48 -0000 Delivered-To: apmail-crunch-user-archive@crunch.apache.org Received: (qmail 94170 invoked by uid 500); 29 Sep 2014 20:49:48 -0000 Mailing-List: contact user-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@crunch.apache.org Delivered-To: mailing list user@crunch.apache.org Received: (qmail 94160 invoked by uid 99); 29 Sep 2014 20:49:48 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 Sep 2014 20:49:48 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of jwills@cloudera.com designates 209.85.215.52 as permitted sender) Received: from [209.85.215.52] (HELO mail-la0-f52.google.com) (209.85.215.52) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 Sep 2014 20:49:22 +0000 Received: by mail-la0-f52.google.com with SMTP id hz20so2311206lab.25 for ; Mon, 29 Sep 2014 13:49:21 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:content-type; bh=rmunYBLnrZw07HD2BJrFIieQr0mLw5Z/Sf6z4jNSJRI=; b=ful2AYwil2pfci5/7b++L41yHIx0EZp9/DikN9LEBPz8vjNiyIHzBW67hBeVS4Qya5 RVj98UQjrxNpuFCduvARlX1u/ly4z/nOyxdLHAan5dFHmWDaR17ExxeL4aSkI2zZZosa MonQlUd2r3dIA2Ue911Gpq/uH3HCwmMKd3I0SdBSyv3pyhWva8Mu7jAB1VvnDfn2qOSs zGwFEIp/+YHxsT0ATEj9FA5i2oskSBJwMQ7WKs3TUXtRnjLf4nZ3d2267wbJbO0VZ4sr qtIJwgs5DBBFlZto9vDY0UbPmC/dzWf+zRw3b5E/t4GRqLp9eMeWOdk+wu2/8tl65hDH P0Qg== X-Gm-Message-State: ALoCoQnOFvUQiTvRdMg02jtIO0fU5RWn2vEYQsT+rvm3Mcb/phC+2/RdWiTmIeNN/TBxIxaBVf2l X-Received: by 10.152.197.35 with SMTP id ir3mr12598691lac.82.1412023761335; Mon, 29 Sep 2014 13:49:21 -0700 (PDT) MIME-Version: 1.0 Received: by 10.114.187.204 with HTTP; Mon, 29 Sep 2014 13:49:01 -0700 (PDT) In-Reply-To: References: From: Josh Wills Date: Mon, 29 Sep 2014 13:49:01 -0700 Message-ID: Subject: Re: Trouble with Avro records To: user@crunch.apache.org Content-Type: multipart/alternative; boundary=001a11341542b5902705043a661f X-Virus-Checked: Checked by ClamAV on apache.org --001a11341542b5902705043a661f Content-Type: text/plain; charset=UTF-8 Hrm, that's surprising. We upgraded the Avro version to deal w/some Java 7 stuff-- maybe exclude it and give it another go? On the Parquet stuff, this is apparently a long standing issue w/Parquet that requires some major surgery to fix. I have some book stuff to get done over the next couple of weeks so I don't have the time to do it myself, but I'll keep checking in with the parquet folks. J On Mon, Sep 29, 2014 at 12:11 PM, Danny Morgan wrote: > Hi Josh, > > This work around with: > > AvroMode.setSpecificClassLoader(CLASS.class.getClassLoader()); > > Doesn't seem to work after upgrading to crunch 0.11. If I don't set a > specific class loader at all it doesn't work either. > > :( > > -Shant > > ------------------------------ > From: josh.wills@gmail.com > Date: Thu, 21 Aug 2014 13:33:40 -0700 > > Subject: Re: Trouble with Avro records > To: user@crunch.apache.org > > > On Thu, Aug 21, 2014 at 1:28 PM, Danny Morgan > wrote: > > Awesome adding: > > AvroMode.setSpecificClassLoader(Visit.class.getClassLoader()); > > Did the trick, thanks Josh! > > Do you mind clarifying when to use Avros.records() versus > Avros.specifics() or Avros.reflects()? > > > Oops, yes-- sorry about that. The records() method is defined on both the > WritableTypeFamily and the AvroTypeFamily as a method that is supposed to > handle "arbitrary" data types that aren't supported by the built-in POJO > and Crunch Tuple methods. Fortunately/unfortunately, the arbitrary data > types depends on the backing implementation: for Writables, records() only > really supports classes that implement Writable, and for Avros, records() > checks to see if the class implements SpecificRecord (at which point it > handles it using Avros.specifics) and if it does not, passes it on to > Avros.reflects. > > I think that, like many thinks in Crunch, records() reflects an early and > somewhat ill-advised decision I made when I was first creating the API that > I would likely do away with if I had it to do over again. :) > > J > > > Thanks Again! > > > ------------------------------ > From: josh.wills@gmail.com > Date: Thu, 21 Aug 2014 13:20:20 -0700 > > Subject: Re: Trouble with Avro records > To: user@crunch.apache.org > > Okay. I suspect this is the problem: > > https://issues.apache.org/jira/browse/CRUNCH-442 > > Gabriel fixed this for the upcoming 0.11 release, but there are a couple > of workarounds in the comments. One is to put the avro jar into your job > jar file, instead of relying on the one that is in the hadoop lib. The > other is to configure AvroMode.setSpecificClassLoader w/the class loader > for your Visit class before kicking off the job. > > > Josh > > > On Thu, Aug 21, 2014 at 1:10 PM, Danny Morgan > wrote: > > Hi Josh, > > crunch-0.10.0-hadoop2 > > Thanks. > > ------------------------------ > From: josh.wills@gmail.com > Date: Thu, 21 Aug 2014 13:04:29 -0700 > Subject: Re: Trouble with Avro records > To: user@crunch.apache.org > > > That feels like an AvroMode-related exception; which version of Crunch are > you using? > > J > > > On Thu, Aug 21, 2014 at 12:45 PM, Danny Morgan > wrote: > > Hi Guys, > > Love crunch but having some trouble recently using Avro records. I think > someone needs to write a Crunch book. > > I'm trying to aggregate hourly visits to a page by each user. I do one > pass to parse the records and then I try to "group by" unique users and > hour and count the number of times they visited as well as their first > visit time in the hour. Here's the Avro schema > > {"namespace": "com.test", > "type": "record", > "name": "Visit", > "fields": [ > {"name": "dateid", "type": "int"}, // Year Month Day Hour > in PST as an integer e.g. 2014081103 > {"name": "userid", "type": "string"}, > {"name": "vcount", "type": ["long", "null"]}, > {"name": "firsttimestamp", "type": ["long", "null"]} // Unixtime > stamp of first visit > ] > } > > Here I do the parsing, at first vcount and firstvisit aren't set. > > PTable> visits = > parsed.parallelDo("visits-parsing", new VisitsExtractor(), > Avros.tableOf(Avros.specifics(Visit.class), > Avros.pairs(Avros.longs(), Avros.longs()))); > > The relevant line from VisitsExtractor: > emitter.emit(Pair.of(visit, Pair.of(1L, log.timestamp()))); > > Everything up to this point works fine, now I want to count up the unique > visitors and the minimum timestamp. > > PTable> agg = > visits.groupByKey().combineValues(Aggregators.pairAggregator(Aggregators.SUM_LONGS(), > Aggregators.MIN_LONGS())); > > The above seems to work fine too, now I want to create new Visit classes > and fill in the count and minimum timestamp fields. > > PCollection count_visits = agg.parallelDo("visits-count", > new DoFn>, Visit>() { > @Override > public void process(Pair> p, > Emitter emitter) { > Visit v = Visit.newBuilder(p.first()).build(); > v.setVcount(p.second().first()); > v.setFirsttimestamp(p.second().second()); > emitter.emit(v); > } > }, Avros.specifics(Visit.class)); > } > > count_visits.write(To.textFile(outputPath), WriteMode.OVERWRITE); > > Here's the error: > 2014-08-21 15:09:26,245 ERROR run.CrunchReducer > (CrunchReducer.java:reduce(54)) - Reducer exception > java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record > cannot be cast to com.test.Visit > at com.test.Logs$1.process(Logs.java:49) > at com.test.Logs$1.process(Logs.java:1) > at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) > at > org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56) > > So I'm pretty sure the problem is this line: > > Visit v = Visit.newBuilder(p.first()).build(); > > specifically p.first() should be a Visit type but I guess it isn't. I > assume the output of the groupBy operation in the reducers is serializing > the key but not using the correct Avro type to do it? > > Also I don't think I understand when I should be using Avros.records() > versus Avros.specifics() when I have a generated avro file. > > Thanks! > > Danny > > > > > -- Director of Data Science Cloudera Twitter: @josh_wills --001a11341542b5902705043a661f Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hrm, that's surprising. We upgraded the Avro version t= o deal w/some Java 7 stuff-- maybe exclude it and give it another go?
<= br>
On the Parquet stuff, this is apparently a long standing issu= e w/Parquet that requires some major surgery to fix. I have some book stuff= to get done over the next couple of weeks so I don't have the time to = do it myself, but I'll keep checking in with the parquet folks.

J

On Mon, Sep 29, 2014 at 12:11 PM, Danny Morgan <unluckyboy@hotmail= .com> wrote:
Hi Josh,

This work around with:

AvroMod= e.setSpecificClassLoader(CLASS.class.getClassLoader());

Doesn't = seem to work after upgrading to crunch 0.11. If I don't set a specific = class loader at all it doesn't work either.

:(

-Shant
=

From: josh.wills@gmail.com
Date: Thu, 21 Aug 2014 13:33:40 -0700

Subject: Re: Trouble with Avro records
To: user@crunch.apache.org


On Thu, Aug 21, 2014 at 1:28 PM, Da= nny Morgan <unluckyboy@hotmail.com> wrote:
Awesome adding:

AvroMode.setSpecificClassLoade= r(Visit.class.getClassLoader());

Did the trick, thanks Josh!

= Do you mind clarifying when to use Avros.records()=20 versus Avros.specifics() or Avros.reflects()?
<= div>
Oops, yes-- sorry about that. The records() method is de= fined on both the WritableTypeFamily and the AvroTypeFamily as a method tha= t is supposed to handle "arbitrary" data types that aren't su= pported by the built-in POJO and Crunch Tuple methods. Fortunately/unfortun= ately, the arbitrary data types depends on the backing implementation: for = Writables, records() only really supports classes that implement Writable, = and for Avros, records() checks to see if the class implements SpecificReco= rd (at which point it handles it using Avros.specifics) and if it does not,= passes it on to Avros.reflects.

I think that, like many thinks in Crunch, records() ref= lects an early and somewhat ill-advised decision I made when I was first cr= eating the API that I would likely do away with if I had it to do over agai= n. :)

J


Thanks Again!


From: josh.wills@gmail.com
Date: Thu, 21 Aug 2014 13:20:20 -0700

Subject: Re: Trouble wit= h Avro records
To: user@crunch.apache.org

Okay. I suspect t= his is the problem:


Gabriel fixed this for the upcoming 0.11 release,= but there are a couple of workarounds in the comments. One is to put the a= vro jar into your job jar file, instead of relying on the one that is in th= e hadoop lib. The other is to configure AvroMode.setSpecificClassLoader w/t= he class loader for your Visit class before kicking off the job.


Josh


On Thu= , Aug 21, 2014 at 1:10 PM, Danny Morgan <unluckyboy@hotmail.com&g= t; wrote:
Hi Josh,

crunch-0.10.0-hadoop2

Thanks.<= br>

From: josh.wills@gmail.com
Date: Thu, 21 Aug 2014 13:04:29 -0700
S= ubject: Re: Trouble with Avro records
To: user@crunch= .apache.org


That feels like an AvroMo= de-related exception; which version of Crunch are you using?

J


On Thu, Aug 21, 2014 at 12:45 PM,= Danny Morgan <unluckyboy@hotmail.com> wrote:
Hi Guys,

Love crunch but having some trouble recently using Avr= o records. I think someone needs to write a Crunch book.

I'm try= ing to aggregate hourly visits to a page by each user. I do one pass to par= se the records and then I try to "group by" unique users and hour= and count the number of times they visited as well as their first visit ti= me in the hour. Here's the Avro schema

{"namespace": "com.test",
=C2=A0"type"= : "record",
=C2=A0"name": "Visit",
=C2= =A0"fields": [
=C2=A0=C2=A0=C2=A0=C2=A0 {"name": &qu= ot;dateid", "type": "int"},=C2=A0=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 // Year Month Day Hour in PST as an inte= ger e.g. 2014081103
=C2=A0=C2=A0=C2=A0=C2=A0 {"name": "userid", "type&= quot;: "string"},
=C2=A0=C2=A0=C2=A0=C2=A0 {"name":= "vcount",=C2=A0 "type": ["long", "null&= quot;]},
=C2=A0=C2=A0=C2=A0=C2=A0 {"name": "firsttimestam= p",=C2=A0 "type": ["long", "null"]} // U= nixtime stamp of first visit
=C2=A0]
}

Here I do the parsing, at first vcount and firstvisit a= ren't set.

PTable<Visit, Pair<Long, Long>> visits = =3D parsed.parallelDo("visits-parsing", new VisitsExtractor(),=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 Avros.tableOf(Avros= .specifics(Visit.class), Avros.pairs(Avros.longs(), Avros.longs())));

The relevant line from VisitsExtractor:
emitter.emit(Pair.of(visit, = Pair.of(1L, log.timestamp())));

Everything up to this point works fi= ne, now I want to count up the unique visitors and the minimum timestamp.
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 PTable<Visit, Pair<Lon= g, Long>> agg =3D visits.groupByKey().combineValues(Aggregators.pairA= ggregator(Aggregators.SUM_LONGS(), Aggregators.MIN_LONGS()));

The ab= ove seems to work fine too, now I want to create new Visit classes and fill= in the count and minimum timestamp fields.

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 PCollection<Visit> cou= nt_visits =3D agg.parallelDo("visits-count", new DoFn<Pair<= Visit, Pair<Long, Long>>, Visit>() {
=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 @Override=C2=A0=C2=A0
=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 public void process(Pair<Visit, = Pair<Long, Long>> p, Emitter<Visit> emitter) {
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 Visit v = =3D Visit.newBuilder(p.first()).build();
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 v.setVcount(p.second().first());
= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 v.setFir= sttimestamp(p.second().second());
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 emitter.emit(v);
=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 }
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 },= Avros.specifics(Visit.class));
=C2=A0=C2=A0=C2=A0 }

=C2=A0=C2=A0=C2=A0=C2=A0 count_visits.write(To.= textFile(outputPath), WriteMode.OVERWRITE);

Here's the error:2014-08-21 15:09:26,245 ERROR run.CrunchReducer (CrunchReducer.java:reduce= (54)) - Reducer exception
java.lang.ClassCastException: org.apache.avro.= generic.GenericData$Record cannot be cast to com.test.Visit
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at com.test.Logs$1.process(Logs.= java:49)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at com.test.Logs$1.p= rocess(Logs.java:1)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.ap= ache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.crunch.impl.mr.emit.IntermediateE= mitter.emit(IntermediateEmitter.java:56)

So I'm pretty sure the problem is this line:

Visit v =3D Vis= it.newBuilder(p.first()).build();

specifically p.first() should be a= Visit type but I guess it isn't. I assume the output of the groupBy op= eration in the reducers is serializing the key but not using the correct Av= ro type to do it?

Also I don't think I understand when I should be using Avros.record= s() versus Avros.specifics() when I have a generated avro file.

Than= ks!

Danny






--
Directo= r of Data Science
Twitter: @josh_wills
--001a11341542b5902705043a661f--