spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
Subject [10/13] git commit: Add docs for standalone scheduler fault tolerance
Date Fri, 11 Oct 2013 00:17:17 GMT
Add docs for standalone scheduler fault tolerance

Also fix a couple HTML/Markdown issues in other files.


Branch: refs/heads/master
Commit: 4ea8ee468fb1f50fce56853a5127a89efc45b706
Parents: 749233b
Author: Aaron Davidson <>
Authored: Tue Oct 8 14:18:31 2013 -0700
Committer: Aaron Davidson <>
Committed: Tue Oct 8 14:18:31 2013 -0700

 docs/            | 45 ++++++++++++++++++++++++++++++++
 docs/ |  5 ++--
 docs/                      |  2 +-
 3 files changed, 48 insertions(+), 4 deletions(-)
diff --git a/docs/ b/docs/
index 81cdbef..5707e19 100644
--- a/docs/
+++ b/docs/
@@ -3,6 +3,9 @@ layout: global
 title: Spark Standalone Mode
+* This will become a table of contents (this text will be scraped).
 In addition to running on the Mesos or YARN cluster managers, Spark also provides a simple
standalone deploy mode. You can launch a standalone cluster either manually, by starting a
master and workers by hand, or use our provided [launch scripts](#cluster-launch-scripts).
It is also possible to run these daemons on a single machine for testing.
 # Installing Spark Standalone to a Cluster
@@ -169,3 +172,45 @@ In addition, detailed log output for each job is also written to the
work direct
 You can run Spark alongside your existing Hadoop cluster by just launching it as a separate
service on the same machines. To access Hadoop data from Spark, just use a hdfs:// URL (typically
`hdfs://<namenode>:9000/path`, but you can find the right URL on your Hadoop Namenode's
web UI). Alternatively, you can set up a separate cluster for Spark, and still have it access
HDFS over the network; this will be slower than disk-local access, but may not be a concern
if you are still running in the same local area network (e.g. you place a few Spark machines
on each rack that you have Hadoop on).
+# Fault tolerance
+By default, standalone scheduling clusters are resilient to Worker failures (insofar as Spark
itself is resilient to losing work by moving it to other workers). However, the scheduler
uses a Master to make scheduling decisions, and this (by default) creates a single point of
failure: if the Master crashes, no new jobs can be created. In order to circumvent this, we
have two fault tolerance schemes, detailed below.
+**Possible gotcha:** Production-level fault tolerance is enabled by having multiple Master
nodes tied together with ZooKeeper. If you have multiple Masters in your cluster but fail
to correctly configure the Masters to use ZooKeeper, the Masters will fail to discover each
other and think they're all leaders. This will not lead to a healthy cluster state (as all
Masters will start scheduling independently).
+## Standby masters with ZooKeeper
+    # May be configured as SPARK_DAEMON_JAVA_OPTS in
+    spark.deploy.recoveryMode=ZOOKEEPER
+    spark.deploy.zookeeper.url=ZK_URL:ZK_PORT # eg
+    spark.deploy.zookeeper.dir=/spark # OPTIONAL! /spark is the default.
+Utilizing ZooKeeper to provide leader election and some state storage, you can launch multiple
Masters in your cluster connected to the same ZooKeeper instance. One will be elected "leader"
and the others will remain in standby mode. If the current leader dies, another Master will
be elected, recover the old Master's state, and then resume scheduling. The entire recovery
process (from the time the the first leader goes down) should take between 1 and 2 minutes.
Note that this delay only affects scheduling _new_ jobs -- jobs that were already running
during Master failover are unaffected.
+In order to schedule new jobs or add Workers to the cluster, they need to know the IP address
of the current leader. This can be accomplished by simply passing in a list of Masters where
you used to pass in a single one. For example, you might start your SparkContext pointing
to ``spark://host1:port1,host2:port2``. This would cause your SparkContext to try registering
with both Masters -- if host1 goes down, this configuration would still be correct as we'd
find the new leader, host2.
+There's an important distinction to be made between "registering with a Master" and normal
operation. When starting up, a SparkContext or Worker needs to be able to find and register
with the current lead Master. Once it successfully registers, though, it is "in the system"
(i.e., stored in ZooKeeper). If failover occurs, the new leader will contact all previously
registered SparkContexts and Workers to inform them of the change in leadership, so they need
not have even known of the existence of the new Master at startup.
+<br />This means that new Masters can be created at any time, and the only thing you
need to worry about is that _new_ SparkContexts and Workers can find it to register with in
case it becomes the leader. Once registered, you're taken care of.
+## Single-node recovery with local file system
+    # May be configured as SPARK_DAEMON_JAVA_OPTS in
+    spark.deploy.recoveryMode=FILESYSTEM
+    spark.deploy.recoveryDirectory=PATH_ACCESSIBLE_TO_MASTER
+ZooKeeper is the best way to go for production-level fault tolerance, but if you just want
to be able to restart the Master if it goes down, FILESYSTEM mode can take care of it. When
SparkContexts and Workers register, they have enough state written to the provided directory
so that they can be recovered upon a restart of the Master process.
+* This solution can be used in tandem with a process monitor/manager like [monit](,
or just to enable manual recovery via restart.
+* While filesystem recovery seems straightforwardly better than not doing any recovery at
all, this mode may be suboptimal for certain development or experimental purposes. In particular,
killing a master via does not clean up its recovery state, so whenever you
start a new Master, it will enter recovery mode. This could increase the startup time by up
to 1 minute if it needs to wait for all previously-registered Workers/clients to timeout.
+* While it's not officially supported, you could mount an NFS directory as the recovery directory.
If the original Master node dies completely, you could then start a Master on a different
node, which would correctly recover all previously registered Workers/clients (equivalent
to ZooKeeper recovery). Note, however, that you **cannot** have multiple Masters alive concurrently
using this approach; you need to upgrade to ZooKeeper to provide leader election for that
diff --git a/docs/ b/docs/
index c7df172..835b257 100644
--- a/docs/
+++ b/docs/
@@ -122,12 +122,12 @@ Spark Streaming features windowed computations, which allow you to apply
 <table class="table">
 <tr><th style="width:30%">Transformation</th><th>Meaning</th></tr>
-  <td> <b>window</b>(<i>windowDuration</i>, </i>slideDuration</i>)
+  <td> <b>window</b>(<i>windowDuration</i>, <i>slideDuration</i>)
   <td> Return a new DStream which is computed based on windowed batches of the source
DStream. <i>windowDuration</i> is the width of the window and <i>slideTime</i>
is the frequency during which the window is calculated. Both times must be multiples of the
batch interval.
-  <td> <b>countByWindow</b>(<i>windowDuration</i>, </i>slideDuration</i>)
+  <td> <b>countByWindow</b>(<i>windowDuration</i>, <i>slideDuration</i>)
   <td> Return a sliding count of elements in the stream. <i>windowDuration</i>
and <i>slideDuration</i> are exactly as defined in <code>window()</code>.
@@ -161,7 +161,6 @@ Spark Streaming features windowed computations, which allow you to apply
  <i>windowDuration</i> and <i>slideDuration</i> are exactly as defined
in <code>window()</code>.
 A complete list of DStream operations is available in the API documentation of [DStream](api/streaming/index.html#org.apache.spark.streaming.DStream)
and [PairDStreamFunctions](api/streaming/index.html#org.apache.spark.streaming.PairDStreamFunctions).
diff --git a/docs/ b/docs/
index 28d88a2..f491ae9 100644
--- a/docs/
+++ b/docs/
@@ -175,7 +175,7 @@ To further tune garbage collection, we first need to understand some basic
 * Java Heap space is divided in to two regions Young and Old. The Young generation is meant
to hold short-lived objects
   while the Old generation is intended for objects with longer lifetimes.
-* The Young generation is further divided into three regions [Eden, Survivor1, Survivor2].
+* The Young generation is further divided into three regions \[Eden, Survivor1, Survivor2\].
 * A simplified description of the garbage collection procedure: When Eden is full, a minor
GC is run on Eden and objects
   that are alive from Eden and Survivor1 are copied to Survivor2. The Survivor regions are
swapped. If an object is old

View raw message