crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Ortiz <dpo5...@gmail.com>
Subject Re: How to keep schema-derived AvroType in MapFn generic?
Date Mon, 16 Mar 2015 15:40:40 GMT
Also, are you sure GenericData.Record is the correct class?  I know when I
use avro to build my records they normally end up as a SpecificRecord
rather than a GenericRecord.

On Mon, Mar 16, 2015 at 11:38 AM David Ortiz <dpo5003@gmail.com> wrote:

> Mattijs,
>
>      Any particular reason you're taking that approach rather than
> something like...
>
>
> keyedStagedLogs = coalescedStagedLogs.parallelDo(
>   "sort-pre",
>   new MapFn<OurAvroType, Pair<Long, OurAvroType>>() {
>
>     private static final long serialVersionUID = 1L;
>     @Override
>     public Pair<Long, OurAvroType> map(OurAvroTypeinput) {
>
>     Long record_ts_key = (Long)input.get(partition_time_sourcename);
>       return Pair.of(record_ts_key, input);
>     }
>   },
>   tf.pairs(tf.longs(), Avros.records(OurAvroType.class))
> );
>
> Thanks,
>      Dave
>
>
> On Mon, Mar 16, 2015 at 11:35 AM Mattijs Jonker <m.jonker@utwente.nl>
> wrote:
>
>> Hello,
>>
>> I am trying to sort Avro data based on a given field in a Crunch
>> pipeline. Since the field in question (a timestamp) does not come first
>> in the Avro schema (and hence does not dictate primarily the normal sort
>> order), I map the Record to a Pair<Long, Record> first to Sort.sort on
>> the desired field. My below code [2] is loosely inspired by the first
>> DoFn in [1].
>>
>> Unfortunately, I encounter a ClassCastException [3] that I find hard to
>> solve on my own. I do not fully understand the way types are handled at
>> runtime, but my guess is that based on the name and namespace in the
>> schema, the first MapFn results in a namespace.OurAvroDataClass Object
>> (which is a SpecificRecord).
>>
>> I would appreciate it if somebody can hint at how to overcome the
>> exception. An alternative method to achieve this sorting is also welcome.
>>
>> Sincerely,
>>
>> Mattijs
>>
>>
>> [1]
>> http://blog.cloudera.com/blog/2014/05/how-to-process-time-se
>> ries-data-using-apache-crunch/
>>
>> [2]
>>
>> PTypeFamily tf = coalescedStagedLogs.getTypeFamily();
>> keyedStagedLogs = coalescedStagedLogs.parallelDo(
>>   "sort-pre",
>>   new MapFn<Record, Pair<Long, Record>>() {
>>     private static final long serialVersionUID = 1L;
>>     @Override
>>     public Pair<Long, Record> map(Record input) {
>>     Long record_ts_key = (Long)input.get(partition_time_sourcename);
>>       return Pair.of(record_ts_key, input);
>>     }
>>   },
>>   tf.pairs(tf.longs(), Avros.generics(schema))
>> );
>>
>> sortedKeyedStagedLogs = Sort.sort(keyedStagedLogs,
>> Sort.Order.ASCENDING); // Sort
>>
>> sortedStagedLogs =  sortedKeyedStagedLogs.parallelDo(
>>   "sort-post",
>>   new MapFn<Pair<Long, Record>, Record>() {
>>     private static final long serialVersionUID = 1L;
>>     @Override
>>     public Record map(Pair<Long, Record> input) {
>>       return new Record(input.second(), true);
>>     }
>>   },
>>   Avros.generics(schema)
>> );
>>
>> [3]
>> Caused by: java.lang.ClassCastException: namespace.OurAvroDataClass
>> cannot be cast to org.apache.avro.generic.GenericData$Record at
>> namespace.OurCrunchToolClass$2.map(OurCrunchToolClass.java:ln)
>>
>> ln => return new Record(input.second(), true);
>>
>

Mime
View raw message