spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srowen <>
Subject [GitHub] spark pull request: [SPARK-4423] Improve foreach() documentation t...
Date Mon, 02 Mar 2015 13:44:19 GMT
Github user srowen commented on a diff in the pull request:
    --- Diff: docs/ ---
    @@ -728,6 +728,69 @@ def doStuff(self, rdd):
    +### Understanding closures <a name="ClosuresLink"></a>
    +One of the harder things about Spark is understanding the scope and life cycle of variables
and methods when executing code across a cluster. RDD operations that modify variables outside
of their scope can be a frequent source of confusion. In the example below we'll look at code
that uses `foreach()` to increment a counter, but similar issues can occur for other operations
as well.
    +#### Example
    +Consider the naive RDD element sum below, which behaves completely differently when running
spark in `local` mode (e.g. via the shell) and when deploying a Spark application to a cluster
(e.g. via spark-submit to YARN): 
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +{% highlight scala %}
    +var counter = 0
    +var rdd = sc.parallelize(data)
    +// Wrong: Don't do this!!
    +rdd.foreach(x => counter += x)
    +println("Counter value: " + counter)
    +{% endhighlight %}
    +<div data-lang="java"  markdown="1">
    +{% highlight java %}
    +int counter = 0;
    +JavaRDD<Integer> rdd = sc.parallelize(data); 
    +// Wrong: Don't do this!!
    +rdd.foreach(x -> counter += x;)
    +println("Counter value: " + counter)
    +{% endhighlight %}
    +<div data-lang="python"  markdown="1">
    +{% highlight python %}
    +counter = 0
    +rdd = sc.parallelize(data)
    +# Wrong: Don't do this!!
    +rdd.foreach(lambda x => counter+= x)
    +print("Counter value: " + counter)
    +{% endhighlight %}
    +#### Local vs. cluster modes
    +In local mode, the above code will correctly sum the values within the RDD and store
it in **counter**. This is because both the RDD and the variable **counter** are in the same
memory on the driver node. 
    +However, in `cluster` mode, what happens is more complicated, and the above code will
not work correctly. To execute jobs Spark breaks up the processing of RDD operations into
tasks - each of which is operated on by an executor. Prior to execution, Spark computes the
**closure**. The closure is those variables and methods which must be visible for the executor
to perform its computations on the RDD (in this case `foreach()`). This closure is serialized
and sent to each executor. In `local` mode, there is only the one executors so everything
shares the same closure. In `remote` mode however, this is not the case and the executors
running on seperate worker nodes each have their own copy of the closure.
    +The problem here is that the variables within the closure sent to each executor are now
copies and thus, when **counter** is referenced within the `foreach` function, it's no longer
the **counter** on the driver node. There is still a **counter** in the memory of the driver
node but this is no longer visible to the executors! The executors only sees the copy from
the serialized closure. Thus, the final value of **counter** will still be zero since all
operations on **counter** were referencing the value within the serialized closure.  
    +The one exception to this is when the variable being modified is an [`Accumulator`](#AccumLink).
Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable
when execution is split up across worker nodes in a cluster. The Accumulators section of this
guide discusses these in more detail.  
    +In general, closures - constructs like loops or locally defined methods, should not be
used to mutate some global state. Spark does not define or guarantee the behavior of mutations
to objects referenced from outside of closures. Some code that does this may work in local
mode, but that's just by accident and such code will not behave as expected in distributed
mode. Use an accumulator instead if some global aggregation is needed.
    +#### Printing elements of an RDD 
    +Another common idiom is attempting to print out the elements of an RDD using `rdd.foreach(println)`
or ``. Intuitively, it seems that this should work. But again, consider that
in `cluster` mode, the output to `stdout` being called by the executors is now writing to
the executor's `stdout` instead, not the one on the driver! Consequently, Spark will start
writing to `stdout` on the worker node, potentially filling up `/tmp` storage rapdily. To
avoid this, one can use the `collect()` method to first bring the RDD to the driver node thus:
`rdd.collect().foreach(println)`. This can cause the driver to run out of memory, though,
because collect() fetches the entire RDD to a single machine; a safer approach is to use the
`take()` method to only get a few elements of the RDD: `rdd.take(100).foreach(println)`.
    --- End diff --
    Mark `collect()` as code font at the end here. It is worth noting that `collect()` brings
all data to the driver, and if the RDD is large, could cause the driver to run out of memory.
Yes, `take()` is a better idea, but I suppose only if you really just want to print a few
elements. It's not the same as `collect()` of course. Might be worth phrasing that way ...
"If you only need to print a few items from the RDD..."

If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at or file a JIRA ticket
with INFRA.

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message