crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mattijs Jonker <m.jon...@utwente.nl>
Subject How to keep schema-derived AvroType in MapFn generic?
Date Mon, 16 Mar 2015 15:32:31 GMT
Hello,

I am trying to sort Avro data based on a given field in a Crunch
pipeline. Since the field in question (a timestamp) does not come first
in the Avro schema (and hence does not dictate primarily the normal sort
order), I map the Record to a Pair<Long, Record> first to Sort.sort on
the desired field. My below code [2] is loosely inspired by the first
DoFn in [1].

Unfortunately, I encounter a ClassCastException [3] that I find hard to
solve on my own. I do not fully understand the way types are handled at
runtime, but my guess is that based on the name and namespace in the
schema, the first MapFn results in a namespace.OurAvroDataClass Object
(which is a SpecificRecord).

I would appreciate it if somebody can hint at how to overcome the
exception. An alternative method to achieve this sorting is also welcome.

Sincerely,

Mattijs


[1]
http://blog.cloudera.com/blog/2014/05/how-to-process-time-series-data-using-apache-crunch/

[2]

PTypeFamily tf = coalescedStagedLogs.getTypeFamily();
keyedStagedLogs = coalescedStagedLogs.parallelDo(			
  "sort-pre",
  new MapFn<Record, Pair<Long, Record>>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Pair<Long, Record> map(Record input) {
    Long record_ts_key = (Long)input.get(partition_time_sourcename);
      return Pair.of(record_ts_key, input);
    }
  },
  tf.pairs(tf.longs(), Avros.generics(schema))
);
		
sortedKeyedStagedLogs = Sort.sort(keyedStagedLogs,
Sort.Order.ASCENDING); // Sort
			
sortedStagedLogs =  sortedKeyedStagedLogs.parallelDo(
  "sort-post",
  new MapFn<Pair<Long, Record>, Record>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Record map(Pair<Long, Record> input) {
      return new Record(input.second(), true);
    }
  },
  Avros.generics(schema)
);

[3]
Caused by: java.lang.ClassCastException: namespace.OurAvroDataClass
cannot be cast to org.apache.avro.generic.GenericData$Record at
namespace.OurCrunchToolClass$2.map(OurCrunchToolClass.java:ln)

ln => return new Record(input.second(), true);

Mime
View raw message