Return-Path: X-Original-To: apmail-crunch-user-archive@www.apache.org Delivered-To: apmail-crunch-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F04D611776 for ; Thu, 21 Aug 2014 20:05:17 +0000 (UTC) Received: (qmail 44734 invoked by uid 500); 21 Aug 2014 20:05:17 -0000 Delivered-To: apmail-crunch-user-archive@crunch.apache.org Received: (qmail 44701 invoked by uid 500); 21 Aug 2014 20:05:17 -0000 Mailing-List: contact user-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@crunch.apache.org Delivered-To: mailing list user@crunch.apache.org Received: (qmail 44691 invoked by uid 99); 21 Aug 2014 20:05:17 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Aug 2014 20:05:17 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of josh.wills@gmail.com designates 209.85.215.46 as permitted sender) Received: from [209.85.215.46] (HELO mail-la0-f46.google.com) (209.85.215.46) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Aug 2014 20:04:50 +0000 Received: by mail-la0-f46.google.com with SMTP id b8so9264839lan.33 for ; Thu, 21 Aug 2014 13:04:49 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :content-type; bh=NH+FMCW7MRtXqcWq1AHhkm3iDHjMNDi53LHWH6J5vPE=; b=hddCTgrbClx06ibCwFXPZ1PHV9fztHz774OTWx8CTlG7lHSBYnyBiLSfmBFqOOFzFe YhVYnu4jYWSbw8MUltCV1kRPqthkqjERd2jDtRdhvBP7Z9bNCP1QTVnTj4fXx1akRx2e M4XWGCxh5BY41BbKpPzUqACeZQGpwqdwzARoXIncn8WXEOStwSNokcX9LIfcxlLNRQeg x6iSdN/dnODSbi5XB9s25LFndzsV7vjcbPiqXhSzxuDjBKz79YH2ZSWUdFi2ozxhlKcJ M5h900OJu6ZK9zkGIl1G6OAd8+/jnLqDFICy5LRDWEfoY3sdYE1helgNASBjigixh1g/ VS7A== X-Received: by 10.112.225.7 with SMTP id rg7mr669631lbc.52.1408651489467; Thu, 21 Aug 2014 13:04:49 -0700 (PDT) MIME-Version: 1.0 Received: by 10.112.108.168 with HTTP; Thu, 21 Aug 2014 13:04:29 -0700 (PDT) In-Reply-To: References: From: Josh Wills Date: Thu, 21 Aug 2014 13:04:29 -0700 Message-ID: Subject: Re: Trouble with Avro records To: user@crunch.apache.org Content-Type: multipart/alternative; boundary=001a11348d4ea4a1530501293b08 X-Virus-Checked: Checked by ClamAV on apache.org --001a11348d4ea4a1530501293b08 Content-Type: text/plain; charset=UTF-8 That feels like an AvroMode-related exception; which version of Crunch are you using? J On Thu, Aug 21, 2014 at 12:45 PM, Danny Morgan wrote: > Hi Guys, > > Love crunch but having some trouble recently using Avro records. I think > someone needs to write a Crunch book. > > I'm trying to aggregate hourly visits to a page by each user. I do one > pass to parse the records and then I try to "group by" unique users and > hour and count the number of times they visited as well as their first > visit time in the hour. Here's the Avro schema > > {"namespace": "com.test", > "type": "record", > "name": "Visit", > "fields": [ > {"name": "dateid", "type": "int"}, // Year Month Day Hour > in PST as an integer e.g. 2014081103 > {"name": "userid", "type": "string"}, > {"name": "vcount", "type": ["long", "null"]}, > {"name": "firsttimestamp", "type": ["long", "null"]} // Unixtime > stamp of first visit > ] > } > > Here I do the parsing, at first vcount and firstvisit aren't set. > > PTable> visits = > parsed.parallelDo("visits-parsing", new VisitsExtractor(), > Avros.tableOf(Avros.specifics(Visit.class), > Avros.pairs(Avros.longs(), Avros.longs()))); > > The relevant line from VisitsExtractor: > emitter.emit(Pair.of(visit, Pair.of(1L, log.timestamp()))); > > Everything up to this point works fine, now I want to count up the unique > visitors and the minimum timestamp. > > PTable> agg = > visits.groupByKey().combineValues(Aggregators.pairAggregator(Aggregators.SUM_LONGS(), > Aggregators.MIN_LONGS())); > > The above seems to work fine too, now I want to create new Visit classes > and fill in the count and minimum timestamp fields. > > PCollection count_visits = agg.parallelDo("visits-count", > new DoFn>, Visit>() { > @Override > public void process(Pair> p, > Emitter emitter) { > Visit v = Visit.newBuilder(p.first()).build(); > v.setVcount(p.second().first()); > v.setFirsttimestamp(p.second().second()); > emitter.emit(v); > } > }, Avros.specifics(Visit.class)); > } > > count_visits.write(To.textFile(outputPath), WriteMode.OVERWRITE); > > Here's the error: > 2014-08-21 15:09:26,245 ERROR run.CrunchReducer > (CrunchReducer.java:reduce(54)) - Reducer exception > java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record > cannot be cast to com.test.Visit > at com.test.Logs$1.process(Logs.java:49) > at com.test.Logs$1.process(Logs.java:1) > at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) > at > org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56) > > So I'm pretty sure the problem is this line: > > Visit v = Visit.newBuilder(p.first()).build(); > > specifically p.first() should be a Visit type but I guess it isn't. I > assume the output of the groupBy operation in the reducers is serializing > the key but not using the correct Avro type to do it? > > Also I don't think I understand when I should be using Avros.records() > versus Avros.specifics() when I have a generated avro file. > > Thanks! > > Danny > --001a11348d4ea4a1530501293b08 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
That feels like an AvroMode-related exception; which versi= on of Crunch are you using?

J


On Thu, Aug 21, 2014 at 12:4= 5 PM, Danny Morgan <unluckyboy@hotmail.com> wrote:
Hi Guys,

Love crunch but having some trouble recently using Avr= o records. I think someone needs to write a Crunch book.

I'm try= ing to aggregate hourly visits to a page by each user. I do one pass to par= se the records and then I try to "group by" unique users and hour= and count the number of times they visited as well as their first visit ti= me in the hour. Here's the Avro schema

{"namespace": "com.test",
=C2=A0"type"= : "record",
=C2=A0"name": "Visit",
=C2= =A0"fields": [
=C2=A0=C2=A0=C2=A0=C2=A0 {"name": &qu= ot;dateid", "type": "int"},=C2=A0=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 // Year Month Day Hour in PST as an inte= ger e.g. 2014081103
=C2=A0=C2=A0=C2=A0=C2=A0 {"name": "userid", "type&= quot;: "string"},
=C2=A0=C2=A0=C2=A0=C2=A0 {"name":= "vcount",=C2=A0 "type": ["long", "null&= quot;]},
=C2=A0=C2=A0=C2=A0=C2=A0 {"name": "firsttimestam= p",=C2=A0 "type": ["long", "null"]} // U= nixtime stamp of first visit
=C2=A0]
}

Here I do the parsing, at first vcount and firstvisit a= ren't set.

PTable<Visit, Pair<Long, Long>> visits = =3D parsed.parallelDo("visits-parsing", new VisitsExtractor(),=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 Avros.tableOf(Avros= .specifics(Visit.class), Avros.pairs(Avros.longs(), Avros.longs())));

The relevant line from VisitsExtractor:
emitter.emit(Pair.of(visit, = Pair.of(1L, log.timestamp())));

Everything up to this point works fi= ne, now I want to count up the unique visitors and the minimum timestamp.
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 PTable<Visit, Pair<Lon= g, Long>> agg =3D visits.groupByKey().combineValues(Aggregators.pairA= ggregator(Aggregators.SUM_LONGS(), Aggregators.MIN_LONGS()));

The ab= ove seems to work fine too, now I want to create new Visit classes and fill= in the count and minimum timestamp fields.

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 PCollection<Visit> cou= nt_visits =3D agg.parallelDo("visits-count", new DoFn<Pair<= Visit, Pair<Long, Long>>, Visit>() {
=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 @Override=C2=A0=C2=A0
=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 public void process(Pair<Visit, = Pair<Long, Long>> p, Emitter<Visit> emitter) {
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 Visit v = =3D Visit.newBuilder(p.first()).build();
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 v.setVcount(p.second().first());
= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 v.setFir= sttimestamp(p.second().second());
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 emitter.emit(v);
=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 }
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 },= Avros.specifics(Visit.class));
=C2=A0=C2=A0=C2=A0 }

=C2=A0=C2=A0=C2=A0=C2=A0 count_visits.write(To.= textFile(outputPath), WriteMode.OVERWRITE);

Here's the error:2014-08-21 15:09:26,245 ERROR run.CrunchReducer (CrunchReducer.java:reduce= (54)) - Reducer exception
java.lang.ClassCastException: org.apache.avro.= generic.GenericData$Record cannot be cast to com.test.Visit
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at com.test.Logs$1.process(Logs.= java:49)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at com.test.Logs$1.p= rocess(Logs.java:1)
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.ap= ache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0 at org.apache.crunch.impl.mr.emit.IntermediateE= mitter.emit(IntermediateEmitter.java:56)

So I'm pretty sure the problem is this line:

Visit v =3D Vis= it.newBuilder(p.first()).build();

specifically p.first() should be a= Visit type but I guess it isn't. I assume the output of the groupBy op= eration in the reducers is serializing the key but not using the correct Av= ro type to do it?

Also I don't think I understand when I should be using Avros.record= s() versus Avros.specifics() when I have a generated avro file.

Than= ks!

Danny
=

--001a11348d4ea4a1530501293b08--