crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mattijs Jonker <>
Subject How to keep schema-derived AvroType in MapFn generic?
Date Mon, 16 Mar 2015 15:32:31 GMT

I am trying to sort Avro data based on a given field in a Crunch
pipeline. Since the field in question (a timestamp) does not come first
in the Avro schema (and hence does not dictate primarily the normal sort
order), I map the Record to a Pair<Long, Record> first to Sort.sort on
the desired field. My below code [2] is loosely inspired by the first
DoFn in [1].

Unfortunately, I encounter a ClassCastException [3] that I find hard to
solve on my own. I do not fully understand the way types are handled at
runtime, but my guess is that based on the name and namespace in the
schema, the first MapFn results in a namespace.OurAvroDataClass Object
(which is a SpecificRecord).

I would appreciate it if somebody can hint at how to overcome the
exception. An alternative method to achieve this sorting is also welcome.





PTypeFamily tf = coalescedStagedLogs.getTypeFamily();
keyedStagedLogs = coalescedStagedLogs.parallelDo(			
  new MapFn<Record, Pair<Long, Record>>() {
    private static final long serialVersionUID = 1L;
    public Pair<Long, Record> map(Record input) {
    Long record_ts_key = (Long)input.get(partition_time_sourcename);
      return Pair.of(record_ts_key, input);
  tf.pairs(tf.longs(), Avros.generics(schema))
sortedKeyedStagedLogs = Sort.sort(keyedStagedLogs,
Sort.Order.ASCENDING); // Sort
sortedStagedLogs =  sortedKeyedStagedLogs.parallelDo(
  new MapFn<Pair<Long, Record>, Record>() {
    private static final long serialVersionUID = 1L;
    public Record map(Pair<Long, Record> input) {
      return new Record(input.second(), true);

Caused by: java.lang.ClassCastException: namespace.OurAvroDataClass
cannot be cast to org.apache.avro.generic.GenericData$Record at

ln => return new Record(input.second(), true);

View raw message