crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Danny Morgan <unlucky...@hotmail.com>
Subject RE: Trouble with Avro records
Date Thu, 21 Aug 2014 20:28:06 GMT
Awesome adding:

AvroMode.setSpecificClassLoader(Visit.class.getClassLoader());

Did the trick, thanks Josh!

Do you mind clarifying when to use  Avros.records() 
versus Avros.specifics() or Avros.reflects()?

Thanks Again!


From: josh.wills@gmail.com
Date: Thu, 21 Aug 2014 13:20:20 -0700
Subject: Re: Trouble with Avro records
To: user@crunch.apache.org

Okay. I suspect this is the problem:
https://issues.apache.org/jira/browse/CRUNCH-442

Gabriel fixed this for the upcoming 0.11 release, but there are a couple of workarounds in
the comments. One is to put the avro jar into your job jar file, instead of relying on the
one that is in the hadoop lib. The other is to configure AvroMode.setSpecificClassLoader w/the
class loader for your Visit class before kicking off the job.



Josh

On Thu, Aug 21, 2014 at 1:10 PM, Danny Morgan <unluckyboy@hotmail.com> wrote:





Hi Josh,

crunch-0.10.0-hadoop2

Thanks.

From: josh.wills@gmail.com
Date: Thu, 21 Aug 2014 13:04:29 -0700
Subject: Re: Trouble with Avro records


To: user@crunch.apache.org

That feels like an AvroMode-related exception; which version of Crunch are you using?


J

On Thu, Aug 21, 2014 at 12:45 PM, Danny Morgan <unluckyboy@hotmail.com> wrote:





Hi Guys,



Love crunch but having some trouble recently using Avro records. I think someone needs to
write a Crunch book.

I'm trying to aggregate hourly visits to a page by each user. I do one pass to parse the records
and then I try to "group by" unique users and hour and count the number of times they visited
as well as their first visit time in the hour. Here's the Avro schema





{"namespace": "com.test",
 "type": "record",
 "name": "Visit",
 "fields": [
     {"name": "dateid", "type": "int"},            // Year Month Day Hour in PST as an integer
e.g. 2014081103




     {"name": "userid", "type": "string"}, 
     {"name": "vcount",  "type": ["long", "null"]},
     {"name": "firsttimestamp",  "type": ["long", "null"]} // Unixtime stamp of first visit




 ]
}

Here I do the parsing, at first vcount and firstvisit aren't set.

PTable<Visit, Pair<Long, Long>> visits = parsed.parallelDo("visits-parsing", new
VisitsExtractor(),
          Avros.tableOf(Avros.specifics(Visit.class), Avros.pairs(Avros.longs(), Avros.longs())));





The relevant line from VisitsExtractor:
emitter.emit(Pair.of(visit, Pair.of(1L, log.timestamp())));

Everything up to this point works fine, now I want to count up the unique visitors and the
minimum timestamp.





        PTable<Visit, Pair<Long, Long>> agg = visits.groupByKey().combineValues(Aggregators.pairAggregator(Aggregators.SUM_LONGS(),
Aggregators.MIN_LONGS()));

The above seems to work fine too, now I want to create new Visit classes and fill in the count
and minimum timestamp fields.





        PCollection<Visit> count_visits = agg.parallelDo("visits-count", new DoFn<Pair<Visit,
Pair<Long, Long>>, Visit>() {
          @Override   
          public void process(Pair<Visit, Pair<Long, Long>> p, Emitter<Visit>
emitter) {




            Visit v = Visit.newBuilder(p.first()).build();
            v.setVcount(p.second().first());
            v.setFirsttimestamp(p.second().second());
            emitter.emit(v);
          }
       }, Avros.specifics(Visit.class));




    }

     count_visits.write(To.textFile(outputPath), WriteMode.OVERWRITE);

Here's the error:
2014-08-21 15:09:26,245 ERROR run.CrunchReducer (CrunchReducer.java:reduce(54)) - Reducer
exception
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to
com.test.Visit




        at com.test.Logs$1.process(Logs.java:49)
        at com.test.Logs$1.process(Logs.java:1)
        at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
        at org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)





So I'm pretty sure the problem is this line:

Visit v = Visit.newBuilder(p.first()).build();

specifically p.first() should be a Visit type but I guess it isn't. I assume the output of
the groupBy operation in the reducers is serializing the key but not using the correct Avro
type to do it?





Also I don't think I understand when I should be using Avros.records() versus Avros.specifics()
when I have a generated avro file.

Thanks!

Danny
 		 	   		  



 		 	   		  

 		 	   		  

 		 	   		  
Mime
View raw message