spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Xia, Junluan" <>
Subject RE: SPARK-942
Date Tue, 12 Nov 2013 01:14:50 GMT
Hi Kyle

I totally agree with you. 'best' solution currently is to only handle "DISK_ONLY" scenario
and put iterator directly to BlockManager.

It is so expensive for us to make code complicated for only 0.1% possibility before we get
perfect solution.

-----Original Message-----
From: Kyle Ellrott [] 
Sent: Tuesday, November 12, 2013 6:28 AM
Subject: Re: SPARK-942

The problem is that the iterator interface only defines 'hasNext' and 'next' methods. So I
don't think that there is really anyway to estimate the total count until the iterator is
done traversing. In my particular case, I'm wrapping a OpenRDF RIO iterator, that is parsing
a gzipfile stream. And one of the files just happens to be several gigabytes large.
Each of the individual elements spit out by the iterator are all the same, just sometimes
it spits out a few million more then normal.

It's not a normal occurrence. 99.9% of the time, when people call 'flatMap'
they will probably be producing arrays that fit nicely into memory. Trying to do a bunch of
extra book keeping (ie unrolling the iterator one at a time, trying to figure out if it's
gotten too big yet), may be an extra complication that makes the code much more complicated
while only providing a solution for extreme edge cases.

I think the 'best' way to go would to leave the 'MEMORY_ONLY' and 'MEMORY_AND_DISK' behaviors
the same. If the user knows that their code could produce these 'mega-iterators' then they
pass a 'DISK_ONLY' and that iterator gets passed straight to the BlockManager to be written
straight to disk. Then all we have to do is change "def put(blockId: BlockId, values:
Iterator[Any], level: StorageLevel, tellMaster: Boolean)"
(BlockManager.scala:452), to call 'diskStore.putValues' directly, rather then unrolling the
iterator and passing it onto the stardard 'doPut' like it does now.


On Mon, Nov 11, 2013 at 5:18 AM, Xia, Junluan <> wrote:

> Hi
> I think it is bad user experience to throw OOM exception when user 
> only persist the RDD with DISK_ONLY or MEMORY_ADN_DISK.
> As Kyle mentioned below, Key point is that CacheManager has unrolled 
> the total Iterator into ArrayBuffer without free memory check, we 
> should estimate size of unrolled iterator object and check if it is 
> beyond current free memory size.
> We could separate into three scenarios
> 1. For MEMORY_ONLY, I think it is normal case to throw OOM exception 
> and need user to adjust its application 2. For MEMORY_AND_DISK, we 
> should check if free memory could hold unrolled Arraybuffer, if yes, 
> then it will go with usual path, if no, we will degrade it to 
> DISK_ONLY 3. For DIS_ONLY, I think that we need not to unroll total 
> iterator into ArrayBuffer, because we could write this iterator one by 
> one to disk.
> So this issue is how to judge if free memory size could hold size of 
> unrolled iterator before it become Arraybuffer.
> Is there any solution for this case? Could we just unroll first 10% of 
> total iterator into ArrayBuffer, and estimate this size, and total 
> size is equal to 10* size of 10%? apparently it is not perfect.
> -----Original Message-----
> From: Kyle Ellrott []
> Sent: Thursday, November 07, 2013 2:59 AM
> To:
> Subject: Re: SPARK-942
> I think the usage has to be calculated as the iterator is being put 
> into the arraybuffer.
> Right now, the BlockManager, in it's put method when it gets an 
> iterator named 'values' uses the simple stanza of:
> def put(blockId: BlockId, values: Iterator[Any], level: StorageLevel,
> tellMaster: Boolean)
>     : Long = {
>     val elements = new ArrayBuffer[Any]
>     elements ++= values
>     put(blockId, elements, level, tellMaster) }
> Completely unrolling the iterator in a single line.  Above it, the 
> CacheManager does the exact same thing with:
> val elements = new ArrayBuffer[Any]
> elements ++= computedValues
> blockManager.put(key, elements, storageLevel, tellMaster = true)
> We would probably have to implement some sort of 'IteratorBuffer' 
> class, which would wrap an iterator. It would include a method to 
> unroll an iterator into a buffer up to a point, something like
> def unroll(maxMem:Long) : Boolean ={ ...}
> And it would return True if the maxMem was hit. At which point 
> BlockManager could read through the already cached values, then 
> continue on through the rest of the iterators dumping all the values 
> to file. If it unrolled without hitting maxMem (which would probably 
> be most of the time), the class would simply wrap the ArrayBuffer of cached values.
> Kyle
> On Sun, Nov 3, 2013 at 12:50 AM, Reynold Xin <> wrote:
> > It's not a very elegant solution, but one possibility is for the 
> > CacheManager to check whether it will have enough space. If it is 
> > running out of space, skips buffering the output of the iterator & 
> > directly write the output of the iterator to disk (if storage level
> allows that).
> >
> > But it is still tricky to know whether we will run out of space 
> > before we even start running the iterator. One possibility is to use 
> > sizing data from previous partitions to estimate the size of the 
> > current
> partition (i.e.
> > estimated in memory size = avg of current in-memory size / current 
> > input size).
> >
> > Do you have any ideas on this one, Kyle?
> >
> >
> > On Sat, Oct 26, 2013 at 10:53 AM, Kyle Ellrott 
> > <
> > >wrote:
> >
> > > I was wondering if anybody had any thoughts on the best way to 
> > > tackle
> > > SPARK-942 ( ).
> > > Basically, Spark takes an iterator from a flatmap call and because 
> > > I tell it that it needs to persist Spark proceeds to push it all 
> > > into an array before deciding that it doesn't have enough memory 
> > > and trying to
> > serialize
> > > it to disk, and somewhere along the line it runs out of memory. 
> > > For my particular operation, the function return an iterator that 
> > > reads data out of a file, and the size of the files passed to that 
> > > function can vary greatly (from a few kilobytes to a few gigabytes).
> > > The funny thing is
> > that
> > > if I do a strait 'map' operation after the flat map, everything 
> > > works, because Spark just passes the iterator forward and never 
> > > tries to expand the whole thing into memory. But I need do a 
> > > reduceByKey across all the records, so I'd like to persist to disk 
> > > first, and that is where I hit
> > this
> > > snag.
> > > I've already setup a unit test to replicate the problem, and I 
> > > know the area of the code that would need to be fixed.
> > > I'm just hoping for some tips on the best way to fix the problem.
> > >
> > > Kyle
> > >
> >

View raw message