crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Danny Morgan <unlucky...@hotmail.com>
Subject RE: Trouble with Avro records
Date Thu, 21 Aug 2014 20:10:30 GMT
Hi Josh,

crunch-0.10.0-hadoop2

Thanks.

From: josh.wills@gmail.com
Date: Thu, 21 Aug 2014 13:04:29 -0700
Subject: Re: Trouble with Avro records
To: user@crunch.apache.org

That feels like an AvroMode-related exception; which version of Crunch are you using?
J

On Thu, Aug 21, 2014 at 12:45 PM, Danny Morgan <unluckyboy@hotmail.com> wrote:





Hi Guys,



Love crunch but having some trouble recently using Avro records. I think someone needs to
write a Crunch book.

I'm trying to aggregate hourly visits to a page by each user. I do one pass to parse the records
and then I try to "group by" unique users and hour and count the number of times they visited
as well as their first visit time in the hour. Here's the Avro schema



{"namespace": "com.test",
 "type": "record",
 "name": "Visit",
 "fields": [
     {"name": "dateid", "type": "int"},            // Year Month Day Hour in PST as an integer
e.g. 2014081103


     {"name": "userid", "type": "string"}, 
     {"name": "vcount",  "type": ["long", "null"]},
     {"name": "firsttimestamp",  "type": ["long", "null"]} // Unixtime stamp of first visit


 ]
}

Here I do the parsing, at first vcount and firstvisit aren't set.

PTable<Visit, Pair<Long, Long>> visits = parsed.parallelDo("visits-parsing", new
VisitsExtractor(),
          Avros.tableOf(Avros.specifics(Visit.class), Avros.pairs(Avros.longs(), Avros.longs())));



The relevant line from VisitsExtractor:
emitter.emit(Pair.of(visit, Pair.of(1L, log.timestamp())));

Everything up to this point works fine, now I want to count up the unique visitors and the
minimum timestamp.



        PTable<Visit, Pair<Long, Long>> agg = visits.groupByKey().combineValues(Aggregators.pairAggregator(Aggregators.SUM_LONGS(),
Aggregators.MIN_LONGS()));

The above seems to work fine too, now I want to create new Visit classes and fill in the count
and minimum timestamp fields.



        PCollection<Visit> count_visits = agg.parallelDo("visits-count", new DoFn<Pair<Visit,
Pair<Long, Long>>, Visit>() {
          @Override   
          public void process(Pair<Visit, Pair<Long, Long>> p, Emitter<Visit>
emitter) {


            Visit v = Visit.newBuilder(p.first()).build();
            v.setVcount(p.second().first());
            v.setFirsttimestamp(p.second().second());
            emitter.emit(v);
          }
       }, Avros.specifics(Visit.class));


    }

     count_visits.write(To.textFile(outputPath), WriteMode.OVERWRITE);

Here's the error:
2014-08-21 15:09:26,245 ERROR run.CrunchReducer (CrunchReducer.java:reduce(54)) - Reducer
exception
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to
com.test.Visit


        at com.test.Logs$1.process(Logs.java:49)
        at com.test.Logs$1.process(Logs.java:1)
        at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
        at org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)



So I'm pretty sure the problem is this line:

Visit v = Visit.newBuilder(p.first()).build();

specifically p.first() should be a Visit type but I guess it isn't. I assume the output of
the groupBy operation in the reducers is serializing the key but not using the correct Avro
type to do it?



Also I don't think I understand when I should be using Avros.records() versus Avros.specifics()
when I have a generated avro file.

Thanks!

Danny
 		 	   		  

 		 	   		  

 		 	   		  
Mime
View raw message