Return-Path: X-Original-To: apmail-crunch-user-archive@www.apache.org Delivered-To: apmail-crunch-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 428951163C for ; Thu, 21 Aug 2014 19:45:43 +0000 (UTC) Received: (qmail 84400 invoked by uid 500); 21 Aug 2014 19:45:43 -0000 Delivered-To: apmail-crunch-user-archive@crunch.apache.org Received: (qmail 84339 invoked by uid 500); 21 Aug 2014 19:45:43 -0000 Mailing-List: contact user-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@crunch.apache.org Delivered-To: mailing list user@crunch.apache.org Received: (qmail 84328 invoked by uid 99); 21 Aug 2014 19:45:43 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Aug 2014 19:45:43 +0000 X-ASF-Spam-Status: No, hits=2.2 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_NONE,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of unluckyboy@hotmail.com designates 65.55.90.211 as permitted sender) Received: from [65.55.90.211] (HELO SNT004-OMC4S8.hotmail.com) (65.55.90.211) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Aug 2014 19:45:14 +0000 Received: from SNT148-W38 ([65.55.90.200]) by SNT004-OMC4S8.hotmail.com with Microsoft SMTPSVC(7.5.7601.22701); Thu, 21 Aug 2014 12:45:13 -0700 X-TMN: [Ii1HWEF/Ko4lraE43QCsWIqev9hkbJg4] X-Originating-Email: [unluckyboy@hotmail.com] Message-ID: Content-Type: multipart/alternative; boundary="_3254acfd-3495-48e4-87d1-d54a1e1dd16d_" From: Danny Morgan To: "user@crunch.apache.org" Subject: Trouble with Avro records Date: Thu, 21 Aug 2014 19:45:12 +0000 Importance: Normal In-Reply-To: References: MIME-Version: 1.0 X-OriginalArrivalTime: 21 Aug 2014 19:45:13.0520 (UTC) FILETIME=[6CBEC300:01CFBD78] X-Virus-Checked: Checked by ClamAV on apache.org --_3254acfd-3495-48e4-87d1-d54a1e1dd16d_ Content-Type: text/plain; charset="iso-8859-1" Content-Transfer-Encoding: quoted-printable Hi Guys=2C =0A= =0A= Love crunch but having some trouble recently using Avro records. I think so= meone needs to write a Crunch book. I'm trying to aggregate hourly visits to a page by each user. I do one pass= to parse the records and then I try to "group by" unique users and hour an= d count the number of times they visited as well as their first visit time = in the hour. Here's the Avro schema {"namespace": "com.test"=2C "type": "record"=2C "name": "Visit"=2C "fields": [ {"name": "dateid"=2C "type": "int"}=2C // Year Month Day Ho= ur in PST as an integer e.g. 2014081103 {"name": "userid"=2C "type": "string"}=2C=20 {"name": "vcount"=2C "type": ["long"=2C "null"]}=2C {"name": "firsttimestamp"=2C "type": ["long"=2C "null"]} // Unixtime = stamp of first visit ] } Here I do the parsing=2C at first vcount and firstvisit aren't set. PTable> visits =3D parsed.parallelDo("visits-pa= rsing"=2C new VisitsExtractor()=2C Avros.tableOf(Avros.specifics(Visit.class)=2C Avros.pairs(Avros.l= ongs()=2C Avros.longs())))=3B The relevant line from VisitsExtractor: emitter.emit(Pair.of(visit=2C Pair.of(1L=2C log.timestamp())))=3B Everything up to this point works fine=2C now I want to count up the unique= visitors and the minimum timestamp. PTable> agg =3D visits.groupByKey().com= bineValues(Aggregators.pairAggregator(Aggregators.SUM_LONGS()=2C Aggregator= s.MIN_LONGS()))=3B The above seems to work fine too=2C now I want to create new Visit classes = and fill in the count and minimum timestamp fields. PCollection count_visits =3D agg.parallelDo("visits-count"= =2C new DoFn>=2C Visit>() { @Override =20 public void process(Pair> p=2C Emitte= r emitter) { Visit v =3D Visit.newBuilder(p.first()).build()=3B v.setVcount(p.second().first())=3B v.setFirsttimestamp(p.second().second())=3B emitter.emit(v)=3B } }=2C Avros.specifics(Visit.class))=3B } count_visits.write(To.textFile(outputPath)=2C WriteMode.OVERWRITE)=3B Here's the error: 2014-08-21 15:09:26=2C245 ERROR run.CrunchReducer (CrunchReducer.java:reduc= e(54)) - Reducer exception java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record ca= nnot be cast to com.test.Visit at com.test.Logs$1.process(Logs.java:49) at com.test.Logs$1.process(Logs.java:1) at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(Intermed= iateEmitter.java:56) So I'm pretty sure the problem is this line: Visit v =3D Visit.newBuilder(p.first()).build()=3B specifically p.first() should be a Visit type but I guess it isn't. I assum= e the output of the groupBy operation in the reducers is serializing the ke= y but not using the correct Avro type to do it? Also I don't think I understand when I should be using Avros.records() vers= us Avros.specifics() when I have a generated avro file. Thanks! Danny = --_3254acfd-3495-48e4-87d1-d54a1e1dd16d_ Content-Type: text/html; charset="iso-8859-1" Content-Transfer-Encoding: quoted-printable
Hi Guys=2C
=0A= =0A=

Love crunch but having some troub= le recently using Avro records. I think someone needs to write a Crunch boo= k.

I'm trying to aggregate hourly visits to a page by each user. I d= o one pass to parse the records and then I try to "group by" unique users a= nd hour and count the number of times they visited as well as their first v= isit time in the hour. Here's the Avro schema

{"namespace": "com.tes= t"=2C
 =3B"type": "record"=2C
 =3B"name": "Visit"=2C
 = =3B"fields": [
 =3B =3B =3B =3B {"name": "dateid"=2C "ty= pe": "int"}=2C =3B =3B =3B  =3B =3B =3B  =3B&nb= sp=3B =3B // Year Month Day Hour in PST as an integer e.g. 2014081103 =3B =3B =3B =3B {"name": "userid"=2C "type": "string"}= =2C
 =3B =3B =3B =3B {"name": "vcount"=2C =3B "type= ": ["long"=2C "null"]}=2C
 =3B =3B =3B =3B {"name": "fir= sttimestamp"=2C =3B "type": ["long"=2C "null"]} // Unixtime stamp of fi= rst visit
 =3B]
}

Here I do the parsing=2C at first vcount= and firstvisit aren't set.

PTable<=3BVisit=2C Pair<=3BLong=2C L= ong>=3B>=3B visits =3D parsed.parallelDo("visits-parsing"=2C new Visits= Extractor()=2C
 =3B =3B =3B =3B =3B =3B =3B&= nbsp=3B =3B Avros.tableOf(Avros.specifics(Visit.class)=2C Avros.pairs(A= vros.longs()=2C Avros.longs())))=3B

The relevant line from VisitsExt= ractor:
emitter.emit(Pair.of(visit=2C Pair.of(1L=2C log.timestamp())))= =3B

Everything up to this point works fine=2C now I want to count up= the unique visitors and the minimum timestamp.

 =3B =3B&nbs= p=3B =3B =3B =3B =3B PTable<=3BVisit=2C Pair<=3BLong=2C= Long>=3B>=3B agg =3D visits.groupByKey().combineValues(Aggregators.pai= rAggregator(Aggregators.SUM_LONGS()=2C Aggregators.MIN_LONGS()))=3B

= The above seems to work fine too=2C now I want to create new Visit classes = and fill in the count and minimum timestamp fields.

 =3B =3B=  =3B =3B =3B =3B =3B PCollection<=3BVisit>=3B count= _visits =3D agg.parallelDo("visits-count"=2C new DoFn<=3BPair<=3BVisit= =2C Pair<=3BLong=2C Long>=3B>=3B=2C Visit>=3B() {
 =3B = =3B =3B =3B =3B =3B =3B =3B =3B @Override = =3B =3B
 =3B =3B =3B =3B =3B =3B =3B&nb= sp=3B =3B public void process(Pair<=3BVisit=2C Pair<=3BLong=2C Long= >=3B>=3B p=2C Emitter<=3BVisit>=3B emitter) {
 =3B =3B&n= bsp=3B =3B =3B =3B =3B =3B =3B =3B =3B Visi= t v =3D Visit.newBuilder(p.first()).build()=3B
 =3B =3B =3B&= nbsp=3B =3B =3B =3B =3B =3B =3B =3B v.setVcount= (p.second().first())=3B
 =3B =3B =3B =3B =3B =3B=  =3B =3B =3B =3B =3B v.setFirsttimestamp(p.second().sec= ond())=3B
 =3B =3B =3B =3B =3B =3B =3B = =3B =3B =3B =3B emitter.emit(v)=3B
 =3B =3B =3B&= nbsp=3B =3B =3B =3B =3B =3B }
 =3B =3B = =3B =3B =3B =3B }=2C Avros.specifics(Visit.class))=3B
 = =3B =3B =3B }

 =3B =3B =3B =3B count_visits.= write(To.textFile(outputPath)=2C WriteMode.OVERWRITE)=3B

Here's the = error:
2014-08-21 15:09:26=2C245 ERROR run.CrunchReducer (CrunchReducer.= java:reduce(54)) - Reducer exception
java.lang.ClassCastException: org.a= pache.avro.generic.GenericData$Record cannot be cast to com.test.Visit
&= nbsp=3B =3B =3B =3B =3B =3B =3B at com.test.Logs$1.= process(Logs.java:49)
 =3B =3B =3B =3B =3B =3B&n= bsp=3B at com.test.Logs$1.process(Logs.java:1)
 =3B =3B =3B&= nbsp=3B =3B =3B =3B at org.apache.crunch.impl.mr.run.RTNode.pro= cess(RTNode.java:98)
 =3B =3B =3B =3B =3B =3B&nb= sp=3B at org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(Intermedia= teEmitter.java:56)

So I'm pretty sure the problem is this line:
<= br>Visit v =3D Visit.newBuilder(p.first()).build()=3B

specifically p= .first() should be a Visit type but I guess it isn't. I assume the output o= f the groupBy operation in the reducers is serializing the key but not usin= g the correct Avro type to do it?

Also I don't think I understand wh= en I should be using Avros.records() versus Avros.specifics() when I have a= generated avro file.

Thanks!

Danny
= --_3254acfd-3495-48e4-87d1-d54a1e1dd16d_--