spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matei Zaharia <matei.zaha...@gmail.com>
Subject Re: when should I copy object coming out of RDD
Date Sat, 10 Aug 2013 04:07:25 GMT
What happens is that as we iterate through the SequenceFile, we reuse the same IntegerWritable
(or other Writable) instances for each record. So the rule is *copy a Writable object if you
expect to use the value after the next one is read*. For example, in take(10), the first element
is only looked at after you've read all 10 elements, which is too late. (Basically you're
getting back an array with ten references to the same Writable object -- take a look by printing
it to stdout). On the other hand, in the map() case, you call get() immediately after reading
that object, and before reading the next one, so it's fine.

This is definitely somewhat confusing but it's just an optimization we made because in most
cases you use the object right away and don't need to allocate another Writable. So as another
general rule, just converting the object from a Writable to a "normal" Java type if you want
to keep it around longer is another way. Really it's take() and collect() that will be the
most confusing.

Matei

On Aug 9, 2013, at 2:47 PM, Ameet Kini <ameetkini@gmail.com> wrote:

> 
> 
> When iterating over a HadoopRDD created using SparkContext.sequenceFile, I noticed that
if I don't copy the key as below, every tuple in the RDD has the same value as the last one
seen. Clearly the object is being recycled, so if I don't clone the object, I'm in trouble.
 
> 
> Say if my sequence files had key of type LongWritable
> 
> val hadoopRdd = sc.sequenceFile(..)
> val filteredRdd = hadoopRdd.filter(..)
> 
> Now if I run the below to print the 10 keys of type Long, I see the same value printed
10 times. 
> filteredRdd.take(10).foreach(t => println(t._1.get()))
> 
> Now if I copy the key out, it prints the 10 unique keys correctly
> val hadoopRdd = sc.sequenceFile(..)
> val mappedRdd = hadoopRdd.map(t => (t._1.get(), t._2))
> val filteredRdd = mappedRdd.filter(..)
> filteredRdd.take(10).foreach(t => println(t._1))
> 
> When are users expected to make such copies of objects when performing RDD operations?
> 
> Ameet


Mime
View raw message