spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Christopher Nguyen <...@adatao.com>
Subject Re: "Dynamic variables" in Spark
Date Tue, 22 Jul 2014 00:54:09 GMT
Hi Neil, first off, I'm generally a sympathetic advocate for making changes
to Spark internals to make it easier/better/faster/more awesome.

In this case, I'm (a) not clear about what you're trying to accomplish, and
(b) a bit worried about the proposed solution.

On (a): it is stated that you want to pass some Accumulators around. Yet
the proposed solution is for some "shared" variable that may be set and
"mapped out" and possibly "reduced back", but without any accompanying
accumulation semantics. And yet it doesn't seem like you only want just the
broadcast property. Can you clarify the problem statement with some
before/after client code examples?

On (b): you're right that adding variables to SparkContext should be done
with caution, as it may have unintended consequences beyond just serdes
payload size. For example, there is a stated intention of supporting
multiple SparkContexts in the future, and this proposed solution can make
it a bigger challenge to do so. Indeed, we had a gut-wrenching call to make
a while back on a subject related to this (see
https://github.com/mesos/spark/pull/779). Furthermore, even in a single
SparkContext application, there may be multiple "clients" (of that
application) whose intent to use the proposed "SparkDynamic" would not
necessarily be coordinated.

So, considering a ratio of a/b (benefit/cost), it's not clear to me that
the benefits are significant enough to warrant the costs. Do I
misunderstand that the benefit is to save one explicit parameter (the
"context") in the signature/closure code?

--
Christopher T. Nguyen
Co-founder & CEO, Adatao <http://adatao.com>
linkedin.com/in/ctnguyen



On Mon, Jul 21, 2014 at 2:10 PM, Neil Ferguson <nferguson@gmail.com> wrote:

> Hi all
>
> I have been adding some metrics to the ADAM project
> https://github.com/bigdatagenomics/adam, which runs on Spark, and have a
> proposal for an enhancement to Spark that would make this work cleaner and
> easier.
>
> I need to pass some Accumulators around, which will aggregate metrics
> (timing stats and other metrics) across the cluster. However, it is
> cumbersome to have to explicitly pass some "context" containing these
> accumulators around everywhere that might need them. I can use Scala
> implicits, which help slightly, but I'd still need to modify every method
> in the call stack to take an implicit variable.
>
> So, I'd like to propose that we add the ability to have "dynamic variables"
> (basically thread-local variables) to Spark. This would avoid having to
> pass the Accumulators around explicitly.
>
> My proposed approach is to add a method to the SparkContext class as
> follows:
>
> /**
>  * Sets the value of a "dynamic variable". This value is made available to
> jobs
>  * without having to be passed around explicitly. During execution of a
> Spark job
>  * this value can be obtained from the [[SparkDynamic]] object.
>  */
> def setDynamicVariableValue(value: Any)
>
> Then, when a job is executing the SparkDynamic can be accessed to obtain
> the value of the dynamic variable. The implementation of this object is as
> follows:
>
> object SparkDynamic {
>   private val dynamicVariable = new DynamicVariable[Any]()
>   /**
>    * Gets the value of the "dynamic variable" that has been set in the
> [[SparkContext]]
>    */
>   def getValue: Option[Any] = {
>     Option(dynamicVariable.value)
>   }
>   private[spark] def withValue[S](threadValue: Option[Any])(thunk: => S): S
> = {
>     dynamicVariable.withValue(threadValue.orNull)(thunk)
>   }
> }
>
> The change involves modifying the Task object to serialize the value of the
> dynamic variable, and modifying the TaskRunner class to deserialize the
> value and make it available in the thread that is running the task (using
> the SparkDynamic.withValue method).
>
> I have done a quick prototype of this in this commit:
>
> https://github.com/nfergu/spark/commit/8be28d878f43ad6c49f892764011ae7d273dcea6
> and it seems to work fine in my (limited) testing. It needs more testing,
> tidy-up and documentation though.
>
> One drawback is that the dynamic variable will be serialized for every Task
> whether it needs it or not. For my use case this might not be too much of a
> problem, as serializing and deserializing Accumulators looks fairly
> lightweight -- however we should certainly warn users against setting a
> dynamic variable containing lots of data. I thought about using broadcast
> tables here, but I don't think it's possible to put Accumulators in a
> broadcast table (as I understand it, they're intended for purely read-only
> data).
>
> What do people think about this proposal? My use case aside, it seems like
> it would be a generally useful enhancment to be able to pass certain data
> around without having to explicitly pass it everywhere.
>
> Neil
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message