Return-Path: X-Original-To: apmail-avro-user-archive@www.apache.org Delivered-To: apmail-avro-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B4E5719573 for ; Sat, 12 Mar 2016 10:02:32 +0000 (UTC) Received: (qmail 16181 invoked by uid 500); 12 Mar 2016 10:02:32 -0000 Delivered-To: apmail-avro-user-archive@avro.apache.org Received: (qmail 16106 invoked by uid 500); 12 Mar 2016 10:02:32 -0000 Mailing-List: contact user-help@avro.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@avro.apache.org Delivered-To: mailing list user@avro.apache.org Received: (qmail 16096 invoked by uid 99); 12 Mar 2016 10:02:32 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 12 Mar 2016 10:02:32 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id C4243C26E3 for ; Sat, 12 Mar 2016 10:02:31 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.448 X-Spam-Level: * X-Spam-Status: No, score=1.448 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id hL4ikoUlELvA for ; Sat, 12 Mar 2016 10:02:29 +0000 (UTC) Received: from mail-vk0-f47.google.com (mail-vk0-f47.google.com [209.85.213.47]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with ESMTPS id 97BE15F2ED for ; Sat, 12 Mar 2016 10:02:29 +0000 (UTC) Received: by mail-vk0-f47.google.com with SMTP id e185so161131958vkb.1 for ; Sat, 12 Mar 2016 02:02:29 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :cc; bh=vwBt4dQs4YZKzqai1splUxRufYtyRLpMSOFMocrqRl4=; b=dT+gonolfS/0LB3QsHkAuCWdsSoA8C0RMsodp1fZRJWCWErRafyTJc+UEy/xoErqLv MnM49ISmercpgpipZqk894FP6M/99NnKifZiaJsmkARaPP+XP82Bj3/kAJWtakEIdAXc hBJH0iurYK6PqiFVojuT7Vr2++TqhoHTQ8oksazUxxVc1scIBzz4LIc79tyRViEg7z+l WSi9kPS9dN6fHZOKlwBJS2fOEQMR2cf+qde/ASI1/FpTvsz3zGQk5k0NEkX5HPNzQBjZ CaA8WQOGtNpZMRzgxmB07uKjTQwhCYvTvSpK25u2n+GkmH04enf8YgMuiZYxy4g9Ky+B ZDEg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to:cc; bh=vwBt4dQs4YZKzqai1splUxRufYtyRLpMSOFMocrqRl4=; b=cec2+N61gnVNsOeACEj6E8ONWCVX7hvdmU2uO1XtnlS9iYXAYJsitEgMNdhkyR3KyZ DCPmFnMcumaLgUR9new1Xlzwvd4Ec4mRnpGUI+71g9rp4jFeiv7c02+NsB7+DcXT0g06 2g21xO0CiRL3ETVpGcwZjYbGkLmF7BdMifzwzabsvYkyHrKAtpv+Oz7VPzJs3A2ZwV2v BikwyKuVd1Pc9fejIzvsQfUvjBXbQdV6hzT0NyCoqmH1VV3s3JbJh1RNvgKXU+KFPxnV syTIgL7VFhO3ppLc3Txtc/vtgfj5tNoxFh1KQUuu6p+2UOaWJIlyRussqlPziTlJXz6t 7EZA== X-Gm-Message-State: AD7BkJIVZwBiY1VyIcEkhcfAw5/lJ1hZQG1hN9DRwMDBWZXLFmxTeR6Bl8/sscjP16DkBngOqzTIlDes7bw2hg== MIME-Version: 1.0 X-Received: by 10.31.164.13 with SMTP id n13mr14852696vke.64.1457776948771; Sat, 12 Mar 2016 02:02:28 -0800 (PST) Received: by 10.159.32.15 with HTTP; Sat, 12 Mar 2016 02:02:28 -0800 (PST) In-Reply-To: References: Date: Sat, 12 Mar 2016 18:02:28 +0800 Message-ID: Subject: Re: Repeating Records w/ Spark + Avro? From: Chris Miller To: Peyman Mohajerian Cc: user@avro.apache.org, user Content-Type: multipart/alternative; boundary=001a11416af831d57d052dd7250f --001a11416af831d57d052dd7250f Content-Type: text/plain; charset=UTF-8 Well, I kind of got it... this works below: ***************** val rdd = sc.newAPIHadoopFile(path, classOf[AvroKeyInputFormat[GenericRecord]], classOf[AvroKey[GenericRecord]], classOf[NullWritable]).map(_._1.datum) rdd .map(item => { val item = i.copy() val record = i._1.datum() println(record.get("myValue")) }) .take(10) ***************** Seems strange to me that I have to iterate over the RDD effectively two times -- one to create the RDD, and another to perform my action. It also seems strange that I can't actually access the data in my RDD until I've copied the records. I would think this is a *very* common use case of an RDD -- accessing the data it contains (otherwise, what's the point?). Is there a way to always enable cloning? There used to be a cloneRecords parameter on the hadoopfile method, but that seems to have been removed. Finally, if I add rdd.persist(), then it doesn't work. I guess I would need to do .map(_._1.datum) again before the map that does the real work. -- Chris Miller On Sat, Mar 12, 2016 at 4:15 PM, Chris Miller wrote: > Wow! That sure is buried in the documentation! But yeah, that's what I > thought more or less. > > I tried copying as follows, but that didn't work. > > ***************** > val copyRDD = singleFileRDD.map(_.copy()) > ***************** > > When I iterate over the new copyRDD (foreach or map), I still have the > same problem of duplicate records. I also tried copying within the block > where I'm using it, but that didn't work either: > > ***************** > rdd > .take(10) > .collect() > .map(item => { > val item = i.copy() > val record = i._1.datum() > > println(record.get("myValue")) > }) > ***************** > > What am I doing wrong? > > -- > Chris Miller > > On Sat, Mar 12, 2016 at 1:48 PM, Peyman Mohajerian > wrote: > >> Here is the reason for the behavior: >> '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable >> object for each record, directly caching the returned RDD or directly >> passing it to an aggregation or shuffle operation will create many >> references to the same object. If you plan to directly cache, sort, or >> aggregate Hadoop writable objects, you should first copy them using a map >> function. >> >> >> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/SparkContext.html >> >> So it is Hadoop related. >> >> On Fri, Mar 11, 2016 at 3:19 PM, Chris Miller >> wrote: >> >>> I have a bit of a strange situation: >>> >>> ***************** >>> import org.apache.avro.generic.{GenericData, GenericRecord} >>> import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper, AvroKey} >>> import org.apache.avro.mapreduce.AvroKeyInputFormat >>> import org.apache.hadoop.io.{NullWritable, WritableUtils} >>> >>> val path = "/path/to/data.avro" >>> >>> val rdd = sc.newAPIHadoopFile(path, >>> classOf[AvroKeyInputFormat[GenericRecord]], >>> classOf[AvroKey[GenericRecord]], classOf[NullWritable]) >>> rdd.take(10).foreach( x => println( x._1.datum() )) >>> ***************** >>> >>> In this situation, I get the right number of records returned, and if I >>> look at the contents of rdd I see the individual records as tuple2's... >>> however, if I println on each one as shown above, I get the same result >>> every time. >>> >>> Apparently this has to do with something in Spark or Avro keeping a >>> reference to the item its iterating over, so I need to clone the object >>> before I use it. However, if I try to clone it (from the spark-shell >>> console), I get: >>> >>> ***************** >>> rdd.take(10).foreach( x => { >>> val clonedDatum = x._1.datum().clone() >>> println(clonedDatum.datum()) >>> }) >>> >>> :37: error: method clone in class Object cannot be accessed in >>> org.apache.avro.generic.GenericRecord >>> Access to protected method clone not permitted because >>> prefix type org.apache.avro.generic.GenericRecord does not conform to >>> class $iwC where the access take place >>> val clonedDatum = x._1.datum().clone() >>> ***************** >>> >>> So, how can I clone the datum? >>> >>> Seems I'm not the only one who ran into this problem: >>> https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/102. I >>> can't figure out how to fix it in my case without hacking away like the >>> person in the linked PR did. >>> >>> Suggestions? >>> >>> -- >>> Chris Miller >>> >> >> > --001a11416af831d57d052dd7250f Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Well, I kind of got it... this works below:

=
***= **************
val rdd =3D sc.ne= wAPIHadoopFile(path, classOf[AvroKeyInputFormat[GenericRecord]], classOf[AvroK= ey[GenericRecord]], classOf[NullWritable]).map(_._1.datum)

rdd
=C2= =A0 .map(item =3D> {
=C2=A0 =C2=A0 val record =3D i._1.datum()
=
=C2=A0 =C2=A0 println(record.get("myValue"))
=C2=A0 })
<= div>=C2=A0=C2=A0.take(10)
******= ***********

Seems strange to me that I have to iterate= over the RDD effectively two times -- one to create the RDD, and another t= o perform my action. It also seems strange that I can't actually access= the data in my RDD until I've copied the records. I would think this i= s a very common use case of an RDD -- accessing the data it contains= (otherwise, what's the point?).

Is there a way to always = enable cloning? There used to be a cloneRecords parameter on the hadoopfile= method, but that seems to have been removed.

Finally, if I ad= d=C2=A0rdd.persist(), then it doesn't work. I guess I would need to do=C2=A0.map(_._1.datum)<= /span>=C2=A0again befor= e the map that does the real work.


--
Chris Miller
=

On Sat, Mar 12, 2016 at 4:15 PM, Chris Mille= r <cmiller11101@gmail.com> wrote:
Wow! That sure is buried in the documentatio= n! But yeah, that's what I thought more or less.

I t= ried copying as follows, but that didn't work.

*****************
= val copyRDD =3D singl= eFileRDD.map(_.copy())
*****************
<= /div>

When I i= terate over the new=C2=A0copyRDD=C2=A0(foreach or map), I still have the sam= e problem of duplicate records. I also tried copying within the block where= I'm using it, but that didn't work either:

*****************
rdd
= =C2=A0 .take(10)
=C2= =A0 .collect()
=C2=A0 = .map(item =3D> {
= =C2=A0 =C2=A0 val item =3D i.copy()
=C2=A0 =C2=A0 val record =3D i._1.datum()

=C2=A0 =C2=A0 println(record.get("myValue"))
=C2=A0 })
*****************

What am= I doing wrong?

--
Chris Miller

On Sat, Mar 12, 2016 at 1:48 PM, Peyman Moha= jerian <mohajeri@gmail.com> wrote:
=
Here is the reason for the behavior:
'''Note:''' Because Hadoop's RecordReader clas= s re-uses the same Writable object for each record, directly caching the re= turned RDD or directly passing it to an aggregation or shuffle operation wi= ll create many references to the same object. If you plan to directly cache= , sort, or aggregate Hadoop writable objects, you should first copy them us= ing a=C2=A0map=C2=A0function.


=
So it is Hadoop related.

On Fri, Mar 11, 2016 at 3:19 PM, Chris Miller <= cmiller11101@gm= ail.com> wrote:
I have a bit of a strange situation:

***********= ******
import org.apache.avro= .generic.{GenericData, GenericRecord}
import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper= , AvroKey}
import org.= apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.io.{NullWritable, WritableUtils= }

val path =3D "/path/to/data.avro= "

val rdd =3D sc.newAPIHadoopFile(= path, classOf[AvroKeyInputFormat[GenericRecord]], classOf[AvroKey[GenericRe= cord]], classOf[NullWritable])
rdd.take(10).foreach( x =3D> println( x._1.datum() ))
*****************

In this situation, I= get the right number of records returned, and if I look at the contents of= rdd I see the individual records as tuple2's... however, if I=C2=A0println=C2=A0on each on= e as shown above, I get the same=C2=A0result every time.

Apparently this has to do with something in Spark or Avro keeping a = reference to the item its iterating over, so I need to clone the object bef= ore I use it. However, if I try to clone it (from the spark-shell console),= I get:

*****************
rdd.take(1= 0).foreach( x =3D> {
=C2=A0 val clonedDatum =3D x._1.datum().c= lone()
=C2=A0 println(clonedDatum.datum())
})

<console>:37: error: method clone in class Object can= not be accessed in org.apache.avro.generic.GenericRecord
=C2=A0Ac= cess to protected method clone not permitted because
=C2=A0prefix= type org.apache.avro.generic.GenericRecord does not conform to
= =C2=A0class $iwC where the access take place
=C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 val clonedDatum =3D x._1.datum().clone(= )
*****************

So, how can I cl= one the datum?

Seems I'm not the only one who = ran into this problem: https://github.com/GoogleCloudP= latform/DataflowJavaSDK/issues/102. I can't figure out how to fix i= t in my case without hacking away like the person in the linked PR did.

Suggestions?

--
Ch= ris Miller



--001a11416af831d57d052dd7250f--