spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sean Owen <so...@cloudera.com>
Subject Re: How to tell if one RDD depends on another
Date Thu, 26 Feb 2015 23:58:05 GMT
Yeah, I believe Corey knows that much and is using foreachPartition(i
=> None) to materialize. The question is, how would you do this with
an arbitrary DAG? in this simple example we know what the answer is
but he's trying to do it programmatically.

On Thu, Feb 26, 2015 at 11:54 PM, Zhan Zhang <zzhang@hortonworks.com> wrote:
> In this case, it is slow to wait for rdd1.saveAsHasoopFile(...)  to finish
> probably due to writing to hdfs.  a walk around for this particular case may
> be as follows.
>
> val rdd1 = ......cache()
>
> val rdd2 = rdd1.map().....()
> rdd1.count
> future { rdd1.saveAsHasoopFile(...) }
> future { rdd2.saveAsHadoopFile(…)]
>
> In this way, rdd1 will be calculated once, and two saveAsHadoopFile will
> happen concurrently.
>
> Thanks.
>
> Zhan Zhang
>
>
>
> On Feb 26, 2015, at 3:28 PM, Corey Nolet <cjnolet@gmail.com> wrote:
>
>> What confused me is  the statement of "The final result is that rdd1 is
>> calculated twice.” Is it the expected behavior?
>
> To be perfectly honest, performing an action on a cached RDD in two
> different threads and having them (at the partition level) block until the
> parent are cached would be the behavior and myself and all my coworkers
> expected.
>
> On Thu, Feb 26, 2015 at 6:26 PM, Corey Nolet <cjnolet@gmail.com> wrote:
>>
>> I should probably mention that my example case is much over simplified-
>> Let's say I've got a tree, a fairly complex one where I begin a series of
>> jobs at the root which calculates a bunch of really really complex joins and
>> as I move down the tree, I'm creating reports from the data that's already
>> been joined (i've implemented logic to determine when cached items can be
>> cleaned up, e.g. the last report has been done in a subtree).
>>
>> My issue is that the 'actions' on the rdds are currently being implemented
>> in a single thread- even if I'm waiting on a cache to complete fully before
>> I run the "children" jobs, I'm still in a better placed than I was because
>> I'm able to run those jobs concurrently- right now this is not the case.
>>
>> > What you want is for a request for partition X to wait if partition X is
>> > already being calculated in a persisted RDD.
>>
>> I totally agree and if I could get it so that it's waiting at the
>> granularity of the partition, I'd be in a much much better place. I feel
>> like I'm going down a rabbit hole and working against the Spark API.
>>
>>
>> On Thu, Feb 26, 2015 at 6:03 PM, Sean Owen <sowen@cloudera.com> wrote:
>>>
>>> To distill this a bit further, I don't think you actually want rdd2 to
>>> wait on rdd1 in this case. What you want is for a request for
>>> partition X to wait if partition X is already being calculated in a
>>> persisted RDD. Otherwise the first partition of rdd2 waits on the
>>> final partition of rdd1 even when the rest is ready.
>>>
>>> That is probably usually a good idea in almost all cases. That much, I
>>> don't know how hard it is to implement. But I speculate that it's
>>> easier to deal with it at that level than as a function of the
>>> dependency graph.
>>>
>>> On Thu, Feb 26, 2015 at 10:49 PM, Corey Nolet <cjnolet@gmail.com> wrote:
>>> > I'm trying to do the scheduling myself now- to determine that rdd2
>>> > depends
>>> > on rdd1 and rdd1 is a persistent RDD (storage level != None) so that I
>>> > can
>>> > do the no-op on rdd1 before I run rdd2. I would much rather the DAG
>>> > figure
>>> > this out so I don't need to think about all this.
>>
>>
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message