crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Ortiz <dpo5...@gmail.com>
Subject Re: How to keep schema-derived AvroType in MapFn generic?
Date Mon, 16 Mar 2015 15:38:09 GMT
Mattijs,

     Any particular reason you're taking that approach rather than
something like...


keyedStagedLogs = coalescedStagedLogs.parallelDo(
  "sort-pre",
  new MapFn<OurAvroType, Pair<Long, OurAvroType>>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Pair<Long, OurAvroType> map(OurAvroTypeinput) {
    Long record_ts_key = (Long)input.get(partition_time_sourcename);
      return Pair.of(record_ts_key, input);
    }
  },
  tf.pairs(tf.longs(), Avros.records(OurAvroType.class))
);

Thanks,
     Dave


On Mon, Mar 16, 2015 at 11:35 AM Mattijs Jonker <m.jonker@utwente.nl> wrote:

> Hello,
>
> I am trying to sort Avro data based on a given field in a Crunch
> pipeline. Since the field in question (a timestamp) does not come first
> in the Avro schema (and hence does not dictate primarily the normal sort
> order), I map the Record to a Pair<Long, Record> first to Sort.sort on
> the desired field. My below code [2] is loosely inspired by the first
> DoFn in [1].
>
> Unfortunately, I encounter a ClassCastException [3] that I find hard to
> solve on my own. I do not fully understand the way types are handled at
> runtime, but my guess is that based on the name and namespace in the
> schema, the first MapFn results in a namespace.OurAvroDataClass Object
> (which is a SpecificRecord).
>
> I would appreciate it if somebody can hint at how to overcome the
> exception. An alternative method to achieve this sorting is also welcome.
>
> Sincerely,
>
> Mattijs
>
>
> [1]
> http://blog.cloudera.com/blog/2014/05/how-to-process-time-
> series-data-using-apache-crunch/
>
> [2]
>
> PTypeFamily tf = coalescedStagedLogs.getTypeFamily();
> keyedStagedLogs = coalescedStagedLogs.parallelDo(
>   "sort-pre",
>   new MapFn<Record, Pair<Long, Record>>() {
>     private static final long serialVersionUID = 1L;
>     @Override
>     public Pair<Long, Record> map(Record input) {
>     Long record_ts_key = (Long)input.get(partition_time_sourcename);
>       return Pair.of(record_ts_key, input);
>     }
>   },
>   tf.pairs(tf.longs(), Avros.generics(schema))
> );
>
> sortedKeyedStagedLogs = Sort.sort(keyedStagedLogs,
> Sort.Order.ASCENDING); // Sort
>
> sortedStagedLogs =  sortedKeyedStagedLogs.parallelDo(
>   "sort-post",
>   new MapFn<Pair<Long, Record>, Record>() {
>     private static final long serialVersionUID = 1L;
>     @Override
>     public Record map(Pair<Long, Record> input) {
>       return new Record(input.second(), true);
>     }
>   },
>   Avros.generics(schema)
> );
>
> [3]
> Caused by: java.lang.ClassCastException: namespace.OurAvroDataClass
> cannot be cast to org.apache.avro.generic.GenericData$Record at
> namespace.OurCrunchToolClass$2.map(OurCrunchToolClass.java:ln)
>
> ln => return new Record(input.second(), true);
>

Mime
View raw message