crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mattijs Jonker <m.jon...@utwente.nl>
Subject Re: How to keep schema-derived AvroType in MapFn generic?
Date Mon, 16 Mar 2015 18:21:14 GMT
By your pointing out that GenericData.Record is not a parent of
SpecificRecord, and by using coalescedStagedLogs.getPType() rather than
Avros.specifics(class), I managed to get it to work.

Thank you both!

On 16-03-15 18:58, Josh Wills wrote:
> Try "GenericRecord", which is the parent interface of both:
> 
> https://avro.apache.org/docs/1.7.6/api/java/org/apache/avro/generic/GenericRecord.html
> 
> On Mon, Mar 16, 2015 at 10:53 AM, Mattijs Jonker <m.jonker@utwente.nl
> <mailto:m.jonker@utwente.nl>> wrote:
> 
>     Thank you both David and Josh for responding so swiftly.
> 
>     Yes, the generated OurAvroDataClass is a SpecificRecord, but what isn't
>     shown by my code snippet is that OurCrunchToolClass is not exclusively
>     meant for OurAvroDataClass. As such, OurAvroDataClass cannot be used in
>     the MapFns in the proposed way.
> 
>     I have tried to work with specifics rather than generics by passing down
>     the type of OurAvroDataClass (as Class<R extends SpecificRecord>) to
>     OurCrunchToolClass. This is necessary for use in Avros.specifics(class).
>     Taking this route I end up with an onfortunate [1].
> 
> 
>     [1]
>     org.apache.avro.generic.GenericData$Record cannot be cast to
>     org.apache.avro.specific.SpecificRecord
> 
>     On 16-03-15 17:30, Josh Wills wrote:
>     > David's version should fix the error; the problem is that your
>     > namespace.OurAvroDataClass isn't a subclass of GenericData.Record.
>     >
>     > On Mon, Mar 16, 2015 at 8:40 AM, David Ortiz <dpo5003@gmail.com <mailto:dpo5003@gmail.com>
>     > <mailto:dpo5003@gmail.com <mailto:dpo5003@gmail.com>>> wrote:
>     >
>     >     Also, are you sure GenericData.Record is the correct class?  I know
>     >     when I use avro to build my records they normally end up as a
>     >     SpecificRecord rather than a GenericRecord.
>     >
>     >     On Mon, Mar 16, 2015 at 11:38 AM David Ortiz <dpo5003@gmail.com <mailto:dpo5003@gmail.com>
>     >     <mailto:dpo5003@gmail.com <mailto:dpo5003@gmail.com>>> wrote:
>     >
>     >         Mattijs,
>     >
>     >              Any particular reason you're taking that approach rather
>     >         than something like...
>     >
>     >
>     >         keyedStagedLogs = coalescedStagedLogs.__parallelDo__(
>     >           "sort-pre",
>     >           new MapFn<OurAvroType, Pair<Long, OurAvroType>>() {
>     >
>     >             private static final long serialVersionUID = 1L;
>     >             @Override
>     >             public Pair<Long, OurAvroType> map(OurAvroTypeinput) {
>     >
>     >             Long record_ts_key =
>     >         (Long)input.get(partition___time___sourcename);
>     >               return Pair.of(record_ts_key, input);
>     >             }
>     >           },
>     >           tf.pairs(tf.longs(), Avros.records(OurAvroType.class))
>     >         );
>     >
>     >         Thanks,
>     >              Dave
>     >
>     >
>     >         On Mon, Mar 16, 2015 at 11:35 AM Mattijs Jonker
>     >         <m.jonker@utwente.nl <mailto:m.jonker@utwente.nl>
>     <mailto:m.jonker@utwente.nl <mailto:m.jonker@utwente.nl>>> wrote:
>     >
>     >             Hello,
>     >
>     >             I am trying to sort Avro data based on a given field
>     in a Crunch
>     >             pipeline. Since the field in question (a timestamp)
>     does not
>     >             come first
>     >             in the Avro schema (and hence does not dictate
>     primarily the
>     >             normal sort
>     >             order), I map the Record to a Pair<Long, Record> first to
>     >             Sort.sort on
>     >             the desired field. My below code [2] is loosely
>     inspired by
>     >             the first
>     >             DoFn in [1].
>     >
>     >             Unfortunately, I encounter a ClassCastException [3] that I
>     >             find hard to
>     >             solve on my own. I do not fully understand the way
>     types are
>     >             handled at
>     >             runtime, but my guess is that based on the name and
>     >             namespace in the
>     >             schema, the first MapFn results in a
>     >             namespace.OurAvroDataClass Object
>     >             (which is a SpecificRecord).
>     >
>     >             I would appreciate it if somebody can hint at how to
>     >             overcome the
>     >             exception. An alternative method to achieve this
>     sorting is
>     >             also welcome.
>     >
>     >             Sincerely,
>     >
>     >             Mattijs
>     >
>     >
>     >             [1]
>     >           
>      http://blog.cloudera.com/blog/____2014/05/how-to-process-time-__se__ries-data-using-apache-__crunch/
>     >           
>      <http://blog.cloudera.com/blog/2014/05/how-to-process-time-series-data-using-apache-crunch/>
>     >
>     >             [2]
>     >
>     >             PTypeFamily tf = coalescedStagedLogs.__getTypeFam__ily();
>     >             keyedStagedLogs = coalescedStagedLogs.__parallelDo__(
>     >               "sort-pre",
>     >               new MapFn<Record, Pair<Long, Record>>() {
>     >                 private static final long serialVersionUID = 1L;
>     >                 @Override
>     >                 public Pair<Long, Record> map(Record input) {
>     >                 Long record_ts_key =
>     >             (Long)input.get(partition___time___sourcename);
>     >                   return Pair.of(record_ts_key, input);
>     >                 }
>     >               },
>     >               tf.pairs(tf.longs(), Avros.generics(schema))
>     >             );
>     >
>     >             sortedKeyedStagedLogs = Sort.sort(keyedStagedLogs,
>     >             Sort.Order.ASCENDING); // Sort
>     >
>     >             sortedStagedLogs =  sortedKeyedStagedLogs.__parallel__Do(
>     >               "sort-post",
>     >               new MapFn<Pair<Long, Record>, Record>() {
>     >                 private static final long serialVersionUID = 1L;
>     >                 @Override
>     >                 public Record map(Pair<Long, Record> input) {
>     >                   return new Record(input.second(), true);
>     >                 }
>     >               },
>     >               Avros.generics(schema)
>     >             );
>     >
>     >             [3]
>     >             Caused by: java.lang.ClassCastException:
>     >             namespace.OurAvroDataClass
>     >             cannot be cast to
>     >             org.apache.avro.generic.__Generi__cData$Record at
>     >           
>      namespace.OurCrunchToolClass$__2__.map(OurCrunchToolClass.java:__l__n)
>     >
>     >             ln => return new Record(input.second(), true);
>     >
>     >
>     >
>     >
>     > --
>     > Director of Data Science
>     > Cloudera <http://www.cloudera.com>
>     > Twitter: @josh_wills <http://twitter.com/josh_wills>
> 
> 
> 
> 
> -- 
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>

Mime
View raw message