crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mattijs Jonker <m.jon...@utwente.nl>
Subject Re: How to keep schema-derived AvroType in MapFn generic?
Date Mon, 16 Mar 2015 17:53:55 GMT
Thank you both David and Josh for responding so swiftly.

Yes, the generated OurAvroDataClass is a SpecificRecord, but what isn't
shown by my code snippet is that OurCrunchToolClass is not exclusively
meant for OurAvroDataClass. As such, OurAvroDataClass cannot be used in
the MapFns in the proposed way.

I have tried to work with specifics rather than generics by passing down
the type of OurAvroDataClass (as Class<R extends SpecificRecord>) to
OurCrunchToolClass. This is necessary for use in Avros.specifics(class).
Taking this route I end up with an onfortunate [1].


[1]
org.apache.avro.generic.GenericData$Record cannot be cast to
org.apache.avro.specific.SpecificRecord

On 16-03-15 17:30, Josh Wills wrote:
> David's version should fix the error; the problem is that your
> namespace.OurAvroDataClass isn't a subclass of GenericData.Record.
> 
> On Mon, Mar 16, 2015 at 8:40 AM, David Ortiz <dpo5003@gmail.com
> <mailto:dpo5003@gmail.com>> wrote:
> 
>     Also, are you sure GenericData.Record is the correct class?  I know
>     when I use avro to build my records they normally end up as a
>     SpecificRecord rather than a GenericRecord.
> 
>     On Mon, Mar 16, 2015 at 11:38 AM David Ortiz <dpo5003@gmail.com
>     <mailto:dpo5003@gmail.com>> wrote:
> 
>         Mattijs,
> 
>              Any particular reason you're taking that approach rather
>         than something like...
> 
> 
>         keyedStagedLogs = coalescedStagedLogs.__parallelDo__(
>           "sort-pre",
>           new MapFn<OurAvroType, Pair<Long, OurAvroType>>() {
> 
>             private static final long serialVersionUID = 1L;
>             @Override
>             public Pair<Long, OurAvroType> map(OurAvroTypeinput) {
> 
>             Long record_ts_key =
>         (Long)input.get(partition___time___sourcename);
>               return Pair.of(record_ts_key, input);
>             }
>           },
>           tf.pairs(tf.longs(), Avros.records(OurAvroType.class))
>         );
> 
>         Thanks,
>              Dave
> 
> 
>         On Mon, Mar 16, 2015 at 11:35 AM Mattijs Jonker
>         <m.jonker@utwente.nl <mailto:m.jonker@utwente.nl>> wrote:
> 
>             Hello,
> 
>             I am trying to sort Avro data based on a given field in a Crunch
>             pipeline. Since the field in question (a timestamp) does not
>             come first
>             in the Avro schema (and hence does not dictate primarily the
>             normal sort
>             order), I map the Record to a Pair<Long, Record> first to
>             Sort.sort on
>             the desired field. My below code [2] is loosely inspired by
>             the first
>             DoFn in [1].
> 
>             Unfortunately, I encounter a ClassCastException [3] that I
>             find hard to
>             solve on my own. I do not fully understand the way types are
>             handled at
>             runtime, but my guess is that based on the name and
>             namespace in the
>             schema, the first MapFn results in a
>             namespace.OurAvroDataClass Object
>             (which is a SpecificRecord).
> 
>             I would appreciate it if somebody can hint at how to
>             overcome the
>             exception. An alternative method to achieve this sorting is
>             also welcome.
> 
>             Sincerely,
> 
>             Mattijs
> 
> 
>             [1]
>             http://blog.cloudera.com/blog/____2014/05/how-to-process-time-__se__ries-data-using-apache-__crunch/
>             <http://blog.cloudera.com/blog/2014/05/how-to-process-time-series-data-using-apache-crunch/>
> 
>             [2]
> 
>             PTypeFamily tf = coalescedStagedLogs.__getTypeFam__ily();
>             keyedStagedLogs = coalescedStagedLogs.__parallelDo__(
>               "sort-pre",
>               new MapFn<Record, Pair<Long, Record>>() {
>                 private static final long serialVersionUID = 1L;
>                 @Override
>                 public Pair<Long, Record> map(Record input) {
>                 Long record_ts_key =
>             (Long)input.get(partition___time___sourcename);
>                   return Pair.of(record_ts_key, input);
>                 }
>               },
>               tf.pairs(tf.longs(), Avros.generics(schema))
>             );
> 
>             sortedKeyedStagedLogs = Sort.sort(keyedStagedLogs,
>             Sort.Order.ASCENDING); // Sort
> 
>             sortedStagedLogs =  sortedKeyedStagedLogs.__parallel__Do(
>               "sort-post",
>               new MapFn<Pair<Long, Record>, Record>() {
>                 private static final long serialVersionUID = 1L;
>                 @Override
>                 public Record map(Pair<Long, Record> input) {
>                   return new Record(input.second(), true);
>                 }
>               },
>               Avros.generics(schema)
>             );
> 
>             [3]
>             Caused by: java.lang.ClassCastException:
>             namespace.OurAvroDataClass
>             cannot be cast to
>             org.apache.avro.generic.__Generi__cData$Record at
>             namespace.OurCrunchToolClass$__2__.map(OurCrunchToolClass.java:__l__n)
> 
>             ln => return new Record(input.second(), true);
> 
> 
> 
> 
> -- 
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>

Mime
View raw message