Return-Path: X-Original-To: apmail-crunch-user-archive@www.apache.org Delivered-To: apmail-crunch-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1DB3110A4F for ; Mon, 16 Mar 2015 18:00:19 +0000 (UTC) Received: (qmail 655 invoked by uid 500); 16 Mar 2015 18:00:19 -0000 Delivered-To: apmail-crunch-user-archive@crunch.apache.org Received: (qmail 623 invoked by uid 500); 16 Mar 2015 18:00:18 -0000 Mailing-List: contact user-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@crunch.apache.org Delivered-To: mailing list user@crunch.apache.org Received: (qmail 613 invoked by uid 99); 16 Mar 2015 18:00:18 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 16 Mar 2015 18:00:18 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of jwills@cloudera.com designates 209.85.216.179 as permitted sender) Received: from [209.85.216.179] (HELO mail-qc0-f179.google.com) (209.85.216.179) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 16 Mar 2015 18:00:14 +0000 Received: by qcbkw5 with SMTP id kw5so51207529qcb.2 for ; Mon, 16 Mar 2015 10:59:09 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:content-type; bh=J2dFBopWa7MxdjygADIGSvHPX2nVaMhhJjjgSjaWjjI=; b=SsecP/9IYKenC0+nxrt+3fVglo+hFJFXq6fgusz2xWdjF5lfDUHu32453PdxLWr9MT kjGPlEQOKz5w/V3I7KRahqNGjYnHXVwfbhVOPewEPt/lAcmIZOWJkogAyWrOHKxHY93A qZCg+X8RE1VAKMpNoXArviWGUjxjedXHaWoX2ObExD1ft9cetsfTzybfGzdu+ed6WxAc RBenhmw1GdHYk+Duyjsw+MSqEuSFBJ99Ra43RlKqRoosri1KA3S1ZEOOpJkArrCdIYHX VN7q68k16xsmzUNvnjhceRM8C4tNAMCZ//zdIH3a7AcfjfDxOr3FbJYJEmqomEEl520D Kgxw== X-Gm-Message-State: ALoCoQmoGrHifWkukKt+nmnUP0wTyg5f8O5tY2Pck3+8hCfGU+HlJLE16JlipLz//6EEonB4SwrW X-Received: by 10.229.197.134 with SMTP id ek6mr75981968qcb.21.1426528749012; Mon, 16 Mar 2015 10:59:09 -0700 (PDT) MIME-Version: 1.0 Received: by 10.140.251.3 with HTTP; Mon, 16 Mar 2015 10:58:48 -0700 (PDT) In-Reply-To: <550718B3.8040704@utwente.nl> References: <5506F78F.30006@utwente.nl> <550718B3.8040704@utwente.nl> From: Josh Wills Date: Mon, 16 Mar 2015 10:58:48 -0700 Message-ID: Subject: Re: How to keep schema-derived AvroType in MapFn generic? To: user@crunch.apache.org Content-Type: multipart/alternative; boundary=001a11c247aa58e34905116b9b87 X-Virus-Checked: Checked by ClamAV on apache.org --001a11c247aa58e34905116b9b87 Content-Type: text/plain; charset=UTF-8 Try "GenericRecord", which is the parent interface of both: https://avro.apache.org/docs/1.7.6/api/java/org/apache/avro/generic/GenericRecord.html On Mon, Mar 16, 2015 at 10:53 AM, Mattijs Jonker wrote: > Thank you both David and Josh for responding so swiftly. > > Yes, the generated OurAvroDataClass is a SpecificRecord, but what isn't > shown by my code snippet is that OurCrunchToolClass is not exclusively > meant for OurAvroDataClass. As such, OurAvroDataClass cannot be used in > the MapFns in the proposed way. > > I have tried to work with specifics rather than generics by passing down > the type of OurAvroDataClass (as Class) to > OurCrunchToolClass. This is necessary for use in Avros.specifics(class). > Taking this route I end up with an onfortunate [1]. > > > [1] > org.apache.avro.generic.GenericData$Record cannot be cast to > org.apache.avro.specific.SpecificRecord > > On 16-03-15 17:30, Josh Wills wrote: > > David's version should fix the error; the problem is that your > > namespace.OurAvroDataClass isn't a subclass of GenericData.Record. > > > > On Mon, Mar 16, 2015 at 8:40 AM, David Ortiz > > wrote: > > > > Also, are you sure GenericData.Record is the correct class? I know > > when I use avro to build my records they normally end up as a > > SpecificRecord rather than a GenericRecord. > > > > On Mon, Mar 16, 2015 at 11:38 AM David Ortiz > > wrote: > > > > Mattijs, > > > > Any particular reason you're taking that approach rather > > than something like... > > > > > > keyedStagedLogs = coalescedStagedLogs.__parallelDo__( > > "sort-pre", > > new MapFn>() { > > > > private static final long serialVersionUID = 1L; > > @Override > > public Pair map(OurAvroTypeinput) { > > > > Long record_ts_key = > > (Long)input.get(partition___time___sourcename); > > return Pair.of(record_ts_key, input); > > } > > }, > > tf.pairs(tf.longs(), Avros.records(OurAvroType.class)) > > ); > > > > Thanks, > > Dave > > > > > > On Mon, Mar 16, 2015 at 11:35 AM Mattijs Jonker > > > wrote: > > > > Hello, > > > > I am trying to sort Avro data based on a given field in a > Crunch > > pipeline. Since the field in question (a timestamp) does not > > come first > > in the Avro schema (and hence does not dictate primarily the > > normal sort > > order), I map the Record to a Pair first to > > Sort.sort on > > the desired field. My below code [2] is loosely inspired by > > the first > > DoFn in [1]. > > > > Unfortunately, I encounter a ClassCastException [3] that I > > find hard to > > solve on my own. I do not fully understand the way types are > > handled at > > runtime, but my guess is that based on the name and > > namespace in the > > schema, the first MapFn results in a > > namespace.OurAvroDataClass Object > > (which is a SpecificRecord). > > > > I would appreciate it if somebody can hint at how to > > overcome the > > exception. An alternative method to achieve this sorting is > > also welcome. > > > > Sincerely, > > > > Mattijs > > > > > > [1] > > > http://blog.cloudera.com/blog/____2014/05/how-to-process-time-__se__ries-data-using-apache-__crunch/ > > < > http://blog.cloudera.com/blog/2014/05/how-to-process-time-series-data-using-apache-crunch/ > > > > > > [2] > > > > PTypeFamily tf = coalescedStagedLogs.__getTypeFam__ily(); > > keyedStagedLogs = coalescedStagedLogs.__parallelDo__( > > "sort-pre", > > new MapFn>() { > > private static final long serialVersionUID = 1L; > > @Override > > public Pair map(Record input) { > > Long record_ts_key = > > (Long)input.get(partition___time___sourcename); > > return Pair.of(record_ts_key, input); > > } > > }, > > tf.pairs(tf.longs(), Avros.generics(schema)) > > ); > > > > sortedKeyedStagedLogs = Sort.sort(keyedStagedLogs, > > Sort.Order.ASCENDING); // Sort > > > > sortedStagedLogs = sortedKeyedStagedLogs.__parallel__Do( > > "sort-post", > > new MapFn, Record>() { > > private static final long serialVersionUID = 1L; > > @Override > > public Record map(Pair input) { > > return new Record(input.second(), true); > > } > > }, > > Avros.generics(schema) > > ); > > > > [3] > > Caused by: java.lang.ClassCastException: > > namespace.OurAvroDataClass > > cannot be cast to > > org.apache.avro.generic.__Generi__cData$Record at > > > namespace.OurCrunchToolClass$__2__.map(OurCrunchToolClass.java:__l__n) > > > > ln => return new Record(input.second(), true); > > > > > > > > > > -- > > Director of Data Science > > Cloudera > > Twitter: @josh_wills > -- Director of Data Science Cloudera Twitter: @josh_wills --001a11c247aa58e34905116b9b87 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Try "GenericRecord", which is the parent interfa= ce of both:


On Mon= , Mar 16, 2015 at 10:53 AM, Mattijs Jonker <m.jonker@utwente.nl><= /span> wrote:
Thank you both David and Jo= sh for responding so swiftly.

Yes, the generated OurAvroDataClass is a SpecificRecord, but what isn't=
shown by my code snippet is that OurCrunchToolClass is not exclusively
meant for OurAvroDataClass. As such, OurAvroDataClass cannot be used in
the MapFns in the proposed way.

I have tried to work with specifics rather than generics by passing down the type of OurAvroDataClass (as Class<R extends SpecificRecord>) to<= br> OurCrunchToolClass. This is necessary for use in Avros.specifics(class). Taking this route I end up with an onfortunate [1].


[1]
org.apache.avro.generic.GenericData$Record cannot be cast to
org.apache.avro.specific.SpecificRecord

On 16-03-15 17:30, Josh Wills wrote:
> David's version should fix the error; the problem is that your
> namespace.OurAvroDataClass isn't a subclass of GenericData.Record.=
>
> On Mon, Mar 16, 2015 at 8:40 AM, David Ortiz <dpo5003@gmail.com
> <mailto:dpo5003@gmail.com>> wrote:
>
>=C2=A0 =C2=A0 =C2=A0Also, are you sure GenericData.Record is the correc= t class?=C2=A0 I know
>=C2=A0 =C2=A0 =C2=A0when I use avro to build my records they normally e= nd up as a
>=C2=A0 =C2=A0 =C2=A0SpecificRecord rather than a GenericRecord.
>
>=C2=A0 =C2=A0 =C2=A0On Mon, Mar 16, 2015 at 11:38 AM David Ortiz <dpo5003@gmail.com
>=C2=A0 =C2=A0 =C2=A0<mailto:dpo5003@gmail.com>> wrote:
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0Mattijs,
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 Any particular reason = you're taking that approach rather
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0than something like...
>
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0keyedStagedLogs =3D coalescedS= tagedLogs.__parallelDo__(
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0"sort-pr= e",
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0new MapFn<OurAvroType, Pair= <Long, OurAvroType>>() {
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0private static final lo= ng serialVersionUID =3D 1L;
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0@Override
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0public Pair<Long, Ou= rAvroType> map(OurAvroTypeinput) {
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0Long record_ts_key =3D<= br>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0(Long)input.get(partition___ti= me___sourcename);
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0return Pair.of(record_ts_key, input);
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0},
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0tf.pairs(tf.longs(), Avros.rec= ords(OurAvroType.class))
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0);
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0Thanks,
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 Dave
>
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0On Mon, Mar 16, 2015 at 11:35 AM Matt= ijs Jonker
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0<m.jonker@utwente.nl <mailto:m.jonker@utwente.nl>> wrote: >
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0Hello,
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0I am trying to sort Avr= o data based on a given field in a Crunch
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0pipeline. Since the fie= ld in question (a timestamp) does not
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0come first
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0in the Avro schema (and= hence does not dictate primarily the
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0normal sort
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0order), I map the Recor= d to a Pair<Long, Record> first to
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0Sort.sort on
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0the desired field. My b= elow code [2] is loosely inspired by
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0the first
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0DoFn in [1].
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0Unfortunately, I encoun= ter a ClassCastException [3] that I
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0find hard to
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0solve on my own. I do n= ot fully understand the way types are
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0handled at
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0runtime, but my guess i= s that based on the name and
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0namespace in the
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0schema, the first MapFn= results in a
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0namespace.OurAvroDataCl= ass Object
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0(which is a SpecificRec= ord).
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0I would appreciate it i= f somebody can hint at how to
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0overcome the
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0exception. An alternati= ve method to achieve this sorting is
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0also welcome.
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0Sincerely,
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0Mattijs
>
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0[1]
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0http://blog.cloudera.com/blog/= ____2014/05/how-to-process-time-__se__ries-data-using-apache-__crunch/<= br> >=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0<http://blog.cloudera.com/blog/2014/05/how-to-pro= cess-time-series-data-using-apache-crunch/>
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0[2]
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0PTypeFamily tf =3D coal= escedStagedLogs.__getTypeFam__ily();
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0keyedStagedLogs =3D coa= lescedStagedLogs.__parallelDo__(
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0"sort-pre",
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0new MapFn<Rec= ord, Pair<Long, Record>>() {
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0private s= tatic final long serialVersionUID =3D 1L;
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0@Override=
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0public Pa= ir<Long, Record> map(Record input) {
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0Long reco= rd_ts_key =3D
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0(Long)input.get(= partition___time___sourcename);
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0return Pair.of(record_ts_key, input);
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0},
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0tf.pairs(tf.long= s(), Avros.generics(schema))
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0);
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0sortedKeyedStagedLogs = =3D Sort.sort(keyedStagedLogs,
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0Sort.Order.ASCENDING); = // Sort
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0sortedStagedLogs= =3D=C2=A0 sortedKeyedStagedLogs.__parallel__Do(
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0"sort-post",
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0new MapFn<Pai= r<Long, Record>, Record>() {
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0private s= tatic final long serialVersionUID =3D 1L;
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0@Override=
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0public Re= cord map(Pair<Long, Record> input) {
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0re= turn new Record(input.second(), true);
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0},
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0Avros.generics(s= chema)
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0);
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0[3]
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0Caused by: java.lang.Cl= assCastException:
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0namespace.OurAvroDataCl= ass
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0cannot be cast to
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0org.apache.avro.= generic.__Generi__cData$Record at
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0namespace.OurCrunchTool= Class$__2__.map(OurCrunchToolClass.java:__l__n)
>
>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0ln =3D> return new R= ecord(input.second(), true);
>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>



--
Director of Data Science
Twitter: @josh_wills
--001a11c247aa58e34905116b9b87--