Return-Path: X-Original-To: apmail-crunch-user-archive@www.apache.org Delivered-To: apmail-crunch-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5F52810274 for ; Mon, 16 Mar 2015 15:41:10 +0000 (UTC) Received: (qmail 49140 invoked by uid 500); 16 Mar 2015 15:41:07 -0000 Delivered-To: apmail-crunch-user-archive@crunch.apache.org Received: (qmail 49101 invoked by uid 500); 16 Mar 2015 15:41:07 -0000 Mailing-List: contact user-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@crunch.apache.org Delivered-To: mailing list user@crunch.apache.org Received: (qmail 49091 invoked by uid 99); 16 Mar 2015 15:41:07 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 16 Mar 2015 15:41:07 +0000 X-ASF-Spam-Status: No, hits=1.7 required=5.0 tests=FREEMAIL_ENVFROM_END_DIGIT,HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of dpo5003@gmail.com designates 209.85.223.178 as permitted sender) Received: from [209.85.223.178] (HELO mail-ie0-f178.google.com) (209.85.223.178) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 16 Mar 2015 15:40:42 +0000 Received: by iecvj10 with SMTP id vj10so169928775iec.0 for ; Mon, 16 Mar 2015 08:40:40 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :content-type; bh=zX9MbfMlmDee/DNY82htyBCG4d25dB1AXn8p4jqlUZs=; b=JB7mlfw5YfobMDriBU2xujIoueBYoRzZGmhtRa2QvIGRmTo8N5MUMd4g2mmqK8cqnc V02SOjwJBrM5fTUbw6slwtQH5JbmUrH+bAokp3C8sXPUtgMGY0NTBnq6p3Gzg8Gts+6O iH3RlPkD8kfhZ2BBZQiOvsPMr8vayZ6m3S5M0lC315NPeimhuar/4CVQi5CsnW3ekE7D 4EVcffrs+yUdSEgP2MBz5t1snxNKYsdR2TH8RWG+mQ6yb+boGDQp2f9rqx3/ApbPmLOJ UnJ9PUg1ny6WyXzEIMRP8Vthuu1GAPhDL6ezDdT45FDlaT9T89Gmk7XmyvyGAt/OTaSC osIA== X-Received: by 10.107.163.65 with SMTP id m62mr75304807ioe.40.1426520440312; Mon, 16 Mar 2015 08:40:40 -0700 (PDT) MIME-Version: 1.0 References: <5506F78F.30006@utwente.nl> In-Reply-To: From: David Ortiz Date: Mon, 16 Mar 2015 15:40:40 +0000 Message-ID: Subject: Re: How to keep schema-derived AvroType in MapFn generic? To: user@crunch.apache.org Content-Type: multipart/alternative; boundary=001a1140fc4c1c1cff051169aceb X-Virus-Checked: Checked by ClamAV on apache.org --001a1140fc4c1c1cff051169aceb Content-Type: text/plain; charset=UTF-8 Also, are you sure GenericData.Record is the correct class? I know when I use avro to build my records they normally end up as a SpecificRecord rather than a GenericRecord. On Mon, Mar 16, 2015 at 11:38 AM David Ortiz wrote: > Mattijs, > > Any particular reason you're taking that approach rather than > something like... > > > keyedStagedLogs = coalescedStagedLogs.parallelDo( > "sort-pre", > new MapFn>() { > > private static final long serialVersionUID = 1L; > @Override > public Pair map(OurAvroTypeinput) { > > Long record_ts_key = (Long)input.get(partition_time_sourcename); > return Pair.of(record_ts_key, input); > } > }, > tf.pairs(tf.longs(), Avros.records(OurAvroType.class)) > ); > > Thanks, > Dave > > > On Mon, Mar 16, 2015 at 11:35 AM Mattijs Jonker > wrote: > >> Hello, >> >> I am trying to sort Avro data based on a given field in a Crunch >> pipeline. Since the field in question (a timestamp) does not come first >> in the Avro schema (and hence does not dictate primarily the normal sort >> order), I map the Record to a Pair first to Sort.sort on >> the desired field. My below code [2] is loosely inspired by the first >> DoFn in [1]. >> >> Unfortunately, I encounter a ClassCastException [3] that I find hard to >> solve on my own. I do not fully understand the way types are handled at >> runtime, but my guess is that based on the name and namespace in the >> schema, the first MapFn results in a namespace.OurAvroDataClass Object >> (which is a SpecificRecord). >> >> I would appreciate it if somebody can hint at how to overcome the >> exception. An alternative method to achieve this sorting is also welcome. >> >> Sincerely, >> >> Mattijs >> >> >> [1] >> http://blog.cloudera.com/blog/2014/05/how-to-process-time-se >> ries-data-using-apache-crunch/ >> >> [2] >> >> PTypeFamily tf = coalescedStagedLogs.getTypeFamily(); >> keyedStagedLogs = coalescedStagedLogs.parallelDo( >> "sort-pre", >> new MapFn>() { >> private static final long serialVersionUID = 1L; >> @Override >> public Pair map(Record input) { >> Long record_ts_key = (Long)input.get(partition_time_sourcename); >> return Pair.of(record_ts_key, input); >> } >> }, >> tf.pairs(tf.longs(), Avros.generics(schema)) >> ); >> >> sortedKeyedStagedLogs = Sort.sort(keyedStagedLogs, >> Sort.Order.ASCENDING); // Sort >> >> sortedStagedLogs = sortedKeyedStagedLogs.parallelDo( >> "sort-post", >> new MapFn, Record>() { >> private static final long serialVersionUID = 1L; >> @Override >> public Record map(Pair input) { >> return new Record(input.second(), true); >> } >> }, >> Avros.generics(schema) >> ); >> >> [3] >> Caused by: java.lang.ClassCastException: namespace.OurAvroDataClass >> cannot be cast to org.apache.avro.generic.GenericData$Record at >> namespace.OurCrunchToolClass$2.map(OurCrunchToolClass.java:ln) >> >> ln => return new Record(input.second(), true); >> > --001a1140fc4c1c1cff051169aceb Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Also, are you sure GenericData.Record is the correct class= ?=C2=A0 I know when I use avro to build my records they normally end up as = a SpecificRecord rather than a GenericRecord.

On Mon, Mar 16, 2015 at 11:38 AM David Ortiz <dpo5003@gmail.com> wrote:
Mattijs,

=C2=A0 =C2=A0 =C2=A0Any part= icular reason you're taking that approach rather than something like...=


keyedStagedLogs = =3D coalescedStagedLogs.parallelDo(
=C2=A0 &= quot;sort-pre",
=C2=A0 new Map= Fn<OurAvroType, Pair<Long,=C2=A0OurAvroType>>()= {

=C2=A0 =C2=A0 private static fin= al long serialVersionUID =3D 1L;
=C2=A0 =C2=A0 @Override
<= /div>
=C2=A0 =C2=A0 public Pair<Long,=C2=A0= OurAvroType> map(OurAvroTypeinput) {

=C2=A0 =C2=A0 Long record_ts_key =3D (Long)in= put.get(partition_time_sourcename);
=C2=A0 = =C2=A0 =C2=A0 return Pair.of(record_ts_key, input);
=C2=A0 =C2=A0 }
=C2=A0 },
=C2=A0 tf.pairs(tf.l= ongs(), Avros.records(OurAvroType.class))
);=

Thanks,
=C2=A0 =C2=A0 =C2= =A0Dave


On Mon, Ma= r 16, 2015 at 11:35 AM Mattijs Jonker <m.jonker@utwente.nl> wrote:
Hello,

I am trying to sort Avro data based on a given field in a Crunch
pipeline. Since the field in question (a timestamp) does not come first
in the Avro schema (and hence does not dictate primarily the normal sort order), I map the Record to a Pair<Long, Record> first to Sort.sort o= n
the desired field. My below code [2] is loosely inspired by the first
DoFn in [1].

Unfortunately, I encounter a ClassCastException [3] that I find hard to
solve on my own. I do not fully understand the way types are handled at
runtime, but my guess is that based on the name and namespace in the
schema, the first MapFn results in a namespace.OurAvroDataClass Object
(which is a SpecificRecord).

I would appreciate it if somebody can hint at how to overcome the
exception. An alternative method to achieve this sorting is also welcome.
Sincerely,

Mattijs


[1]
http://blog.cloudera.com/blog= /2014/05/how-to-process-time-series-data-using-= apache-crunch/

[2]

PTypeFamily tf =3D coalescedStagedLogs.getTypeFamily();
keyedStagedLogs =3D coalescedStagedLogs.parallelDo(
=C2=A0 "sort-pre",
=C2=A0 new MapFn<Record, Pair<Long, Record>>() {
=C2=A0 =C2=A0 private static final long serialVersionUID =3D 1L;
=C2=A0 =C2=A0 @Override
=C2=A0 =C2=A0 public Pair<Long, Record> map(Record input) {
=C2=A0 =C2=A0 Long record_ts_key =3D (Long)input.get(partition_time<= u>_sourcename);
=C2=A0 =C2=A0 =C2=A0 return Pair.of(record_ts_key, input);
=C2=A0 =C2=A0 }
=C2=A0 },
=C2=A0 tf.pairs(tf.longs(), Avros.generics(schema))
);

sortedKeyedStagedLogs =3D Sort.sort(keyedStagedLogs,
Sort.Order.ASCENDING); // Sort

sortedStagedLogs =3D=C2=A0 sortedKeyedStagedLogs.parallelDo(<= br> =C2=A0 "sort-post",
=C2=A0 new MapFn<Pair<Long, Record>, Record>() {
=C2=A0 =C2=A0 private static final long serialVersionUID =3D 1L;
=C2=A0 =C2=A0 @Override
=C2=A0 =C2=A0 public Record map(Pair<Long, Record> input) {
=C2=A0 =C2=A0 =C2=A0 return new Record(input.second(), true);
=C2=A0 =C2=A0 }
=C2=A0 },
=C2=A0 Avros.generics(schema)
);

[3]
Caused by: java.lang.ClassCastException: namespace.OurAvroDataClass
cannot be cast to org.apache.avro.generic.GenericData$Record = at
namespace.OurCrunchToolClass$2.map(OurCrunchToolClass.java:ln)

ln =3D> return new Record(input.second(), true);
--001a1140fc4c1c1cff051169aceb--