crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <jwi...@cloudera.com>
Subject Re: How to keep schema-derived AvroType in MapFn generic?
Date Mon, 16 Mar 2015 16:30:13 GMT
David's version should fix the error; the problem is that your
namespace.OurAvroDataClass isn't a subclass of GenericData.Record.

On Mon, Mar 16, 2015 at 8:40 AM, David Ortiz <dpo5003@gmail.com> wrote:

> Also, are you sure GenericData.Record is the correct class?  I know when I
> use avro to build my records they normally end up as a SpecificRecord
> rather than a GenericRecord.
>
> On Mon, Mar 16, 2015 at 11:38 AM David Ortiz <dpo5003@gmail.com> wrote:
>
>> Mattijs,
>>
>>      Any particular reason you're taking that approach rather than
>> something like...
>>
>>
>> keyedStagedLogs = coalescedStagedLogs.parallelDo(
>>   "sort-pre",
>>   new MapFn<OurAvroType, Pair<Long, OurAvroType>>() {
>>
>>     private static final long serialVersionUID = 1L;
>>     @Override
>>     public Pair<Long, OurAvroType> map(OurAvroTypeinput) {
>>
>>     Long record_ts_key = (Long)input.get(partition_time_sourcename);
>>       return Pair.of(record_ts_key, input);
>>     }
>>   },
>>   tf.pairs(tf.longs(), Avros.records(OurAvroType.class))
>> );
>>
>> Thanks,
>>      Dave
>>
>>
>> On Mon, Mar 16, 2015 at 11:35 AM Mattijs Jonker <m.jonker@utwente.nl>
>> wrote:
>>
>>> Hello,
>>>
>>> I am trying to sort Avro data based on a given field in a Crunch
>>> pipeline. Since the field in question (a timestamp) does not come first
>>> in the Avro schema (and hence does not dictate primarily the normal sort
>>> order), I map the Record to a Pair<Long, Record> first to Sort.sort on
>>> the desired field. My below code [2] is loosely inspired by the first
>>> DoFn in [1].
>>>
>>> Unfortunately, I encounter a ClassCastException [3] that I find hard to
>>> solve on my own. I do not fully understand the way types are handled at
>>> runtime, but my guess is that based on the name and namespace in the
>>> schema, the first MapFn results in a namespace.OurAvroDataClass Object
>>> (which is a SpecificRecord).
>>>
>>> I would appreciate it if somebody can hint at how to overcome the
>>> exception. An alternative method to achieve this sorting is also welcome.
>>>
>>> Sincerely,
>>>
>>> Mattijs
>>>
>>>
>>> [1]
>>> http://blog.cloudera.com/blog/2014/05/how-to-process-time-se
>>> ries-data-using-apache-crunch/
>>>
>>> [2]
>>>
>>> PTypeFamily tf = coalescedStagedLogs.getTypeFamily();
>>> keyedStagedLogs = coalescedStagedLogs.parallelDo(
>>>   "sort-pre",
>>>   new MapFn<Record, Pair<Long, Record>>() {
>>>     private static final long serialVersionUID = 1L;
>>>     @Override
>>>     public Pair<Long, Record> map(Record input) {
>>>     Long record_ts_key = (Long)input.get(partition_time_sourcename);
>>>       return Pair.of(record_ts_key, input);
>>>     }
>>>   },
>>>   tf.pairs(tf.longs(), Avros.generics(schema))
>>> );
>>>
>>> sortedKeyedStagedLogs = Sort.sort(keyedStagedLogs,
>>> Sort.Order.ASCENDING); // Sort
>>>
>>> sortedStagedLogs =  sortedKeyedStagedLogs.parallelDo(
>>>   "sort-post",
>>>   new MapFn<Pair<Long, Record>, Record>() {
>>>     private static final long serialVersionUID = 1L;
>>>     @Override
>>>     public Record map(Pair<Long, Record> input) {
>>>       return new Record(input.second(), true);
>>>     }
>>>   },
>>>   Avros.generics(schema)
>>> );
>>>
>>> [3]
>>> Caused by: java.lang.ClassCastException: namespace.OurAvroDataClass
>>> cannot be cast to org.apache.avro.generic.GenericData$Record at
>>> namespace.OurCrunchToolClass$2.map(OurCrunchToolClass.java:ln)
>>>
>>> ln => return new Record(input.second(), true);
>>>
>>


-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Mime
View raw message