spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <>
Subject Re: Understanding spark concepts cluster, master, slave, job, stage, worker, executor, task
Date Thu, 21 Jul 2016 11:56:06 GMT
I started putting together some Performance and Tuning guide for Spark
starting from the simplest operation Local and Standalone modes but sounds
like I never have the time to finish it!

This is some stuff but is in word and wrapped together in some arbitrary
way.  Anyway if you think it is useful let me know and I try to finish it :)

Some of the points we have already discussed in this user group or part of
wider available literature. It is aimed at practitioner.


According to Spark Website, Apache Spark <> is a
fast and general purpose engine for large-scale data processing. It is
written mostly in Scala, and provides APIs for Scala, Java and Python. It
is fully compatible with Hadoop Distributed File System (HDFS), however it
extends on Hadoop’s core functionality by providing in-memory cluster
computation among other things

Providing in-memory capabilities is probably one of the most import aspects
of Spark that allows one to do computation in-memory. It also supports an
advanced scheduler based on directed acyclic graph (DAG
<>). These capabilities
allow Spark to be used as an advanced query engine with the help of Spark
shell and Spark SQL. For near real time data processing Spark Streaming can
be used. Another important but often understated capability of Spark is
deploying it to be used as an advanced execution engine for other Hadoop
tools such as Hive.

Like most of the tools in Hadoop ecosystem, Spark will require careful
tuning to get the most out of it.

Thus, in these brief notes we will aim to address these points to ensure
that you create an infrastructure for Spark, which is not only performant
but also scalable for your needs.

*Why Spark*

The Hadoop ecosystem is nowadays crowded with a variety of offerings. Some
of them are complementary and others are competing with each other. Spark
is unique in that in a space of relatively short time it has grown much in
its popularity and as of today is one of the most popular tools in the
Hadoop ecosystem.

The fundamental technology of Hadoop using Map-Reduce algorithm as its core
execution engine gave rise to deployment of other methods. Although
Map-Reduce was and still is an incredible technology, it lacked the speed
and performance required for certain business needs like dealing with
real-time analytics. Spark was developed ground up to address these

*Overview of Spark Architecture*

Spark much like many other tools runs a set of instructions summarized in
the form of an application. An application consists of a Driver Program
that is responsible for submitting, running and monitoring the code.

 Spark can distribute the work load across what is known as cluster. In
other words, Spark applications run as independent sets of processes on a
cluster. Process is an application running on UNIX/Linux system

A *cluster *is a collection of servers, called nodes that communicate with
each other to make a set of services highly available to the applications
running on them.

Before going any further one can equate a node with a physical host, a VM
host or any other resource capable of providing RAM and core. Some refer to
nodes as machines as well.

*Spark Operations*

Spark takes advantages of a cluster by dividing the workload across this
cluster and executing operations in parallel to speed up the processing. To
affect this Spark provides *as part of its core architecture *an
abstraction layer called a *Resilient Distributed Dataset (RDD). *Simply
put an RDD is a selection of elements (like a sequence, a text file, a CSV
file, data coming in from streaming sources like Twitter, Kafka and so
forth) that one wants to work with.

What an RDD does is to partition that data across the nodes of the Spark
cluster to take advantage of parallel processing.

*RDDs in Spark are immutable *meaning that *they cannot be changed, but can
be acted upon* to create other RDDs and result sets. RDDs can also be
cached in memory (to be precise in the memory allocated to Spark) across
multiple nodes for faster parallel operations.

So that takes care of data. There is a second abstraction in Spark
known as *shared
variables* that can be used in parallel operations. By default, when Spark
runs a function in parallel as a set of tasks on different nodes, it ships
a copy of each variable used in the function to each task. Sometimes, a
variable need to be shared across tasks, or between tasks and the driver
program. *Spark supports two types of shared variables*: *broadcast
variables*, which can be used to cache a value in memory on all nodes, and
*accumulators*, which are variables that are only “added” to, such as
counters and sums. We will cover them later

We have been mentioning Spark clusters but clusters can be configured
differently as well through what is known as the configuration of cluster
manager. To this effect, Spark currently supports the following


   *Spark Local* - Spark runs on the local host. This is the simplest set
   up and best suited for learners who want to understand different concepts
   of Spark and those performing unit testing.

   *Spark Standalone *– a simple cluster manager included with Spark that
   makes it easy to set up a cluster.

   *YARN Cluster Mode,* the Spark driver runs inside an application master
   process which is managed by YARN on the cluster, and the client can go away
   after initiating the application. This is invoked with –master yarn
and --deploy-mode

   *YARN Client Mode*, the driver runs in the client process, and the
   application master is only used for requesting resources from YARN.
Unlike Spark
   standalone mode, in which the master’s address is specified in the
   --master parameter, in YARN mode the ResourceManager’s address is picked
   up from the Hadoop configuration. Thus, the --master parameter is yarn. This
   is invoked with --deploy-mode client

*- client mode requires the process that launched the app remain alive.
Meaning the host where it lives has to stay alive, and it may not be
super-friendly to ssh sessions dying, for example, unless you use nohup. -
client mode driver logs are printed to stderr by default. yes you can
change that, but in cluster mode, they're all collected by yarn without any
user intervention. - if your edge node (from where the app is launched)
isn't really part of the cluster (e.g., lives in an outside network with
firewalls or higher latency), you may run into issues. - in cluster mode,
your driver's cpu / memory usage is accounted for in YARN; this matters if
your edge node is part of the cluster (and could be running yarn
containers), since in client mode your driver will potentially use a lot of
memory / cpu. - finally, in cluster mode YARN can restart your application
without user interference. this is useful for things that need to stay up
(think a long running streaming job, for example).*

*If your client is not close to the cluster (e.g. your PC) then you
definitely want to go cluster to improve performance. If your client is
close to the cluster (e.g. an edge node) then you could go either client or
cluster.  Note that by going client, more resources are going to be used on
the edge node.*

In this part one, we will confine ourselves with *Spark on Local host *and
will leave the other two to other parts.

*Spark Local Mode*

Spark Local Mode is the simplest configuration of Spark that does not
require a Cluster. The user on the local host can launch and experiment
with Spark.

In this mode the driver program (SparkSubmit), the resource manager and
executor all exist within the same JVM. The JVM itself is the worker thread

When you use spark-shell or for that matter spark-sql, you are staring
spark-submit under the bonnet. These two shells are created to make life
easier to work on Spark.

However, if you look at what $SPARK_HOME/bin/spark-shell does in the
script, you will notice my point:

"${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name
"Spark shell" "$@"

So that is basically spark-submit JVM invoked with the name "Spark shell"

Since it is using spark-submit it takes all the parameters related to
spark-submit. However, remember that these two shells are created for
read–eval–print loop (REPL) and they default to Local mode. You cannot use
them for example in YARN cluster mode.

Some default parameters can be changed. For example, the default Web GUI
for Spark is 4040. However, I start it with 55555 and modified it to call
it a different name

"${SPARK_HOME}"/bin/spark-submit --conf "spark.ui.port=55555" --class
org.apache.spark.repl.Main --name "my own Spark shell" "$@"

Before going further let us understand the concept of cores and threads.
These days we talk about cores more than CPUs. Each CPU comes with a number
of cores.

Simply put to work out the number of threads you can do this:

cat /proc/cpuinfo|grep processor|wc -l

 Which for me it returns 12 and that is all I need to know without worrying
what physical cores, logical cores and CPU really mean as these definitions
may vary from one hardware vendor to another.

On local mode with you have

--master local

This will start with one (worker) *thread *or equivalent to –master
local[1]. You can start by more than one thread by specifying the number of
threads *k* in –master local[k]. You can also start using all available
threads with –master local[*]. The degree of parallelism is defined by the
number of threads *k*.

In *Local mode*, you do not need to start master and slaves/workers. In
this mode it is pretty simple and you can run as many JVMs (spark-submit)
as your resources allow (resource meaning memory and cores). Additionally,
the GUI starts by default on port 4040, next one on 4041 and so forth
unless you specifically start it with --conf "spark.ui.port=nnnnn"

Remember this is all about testing your apps. It is NOT a performance test.
What it allows you is to test multiple apps concurrently and more
importantly gets you started and understand various configuration
parameters that Spark uses together with spark-submit executable

You can of course use spark-shell and spark-sql utilities. These in turn
rely on spark-submit executable to run certain variations of the JVM. In
other words, you are still executing spark-submit. You can pass parameters
to spark-submit with an example shown below:

${SPARK_HOME}/bin/spark-submit \

                --packages com.databricks:spark-csv_2.11:1.3.0 \

                --driver-memory 2G \

                --num-executors 1 \

                --executor-memory 2G \

                --master local \

                --executor-cores 2 \

                --conf "spark.scheduler.mode=FAIR" \

                --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps" \

/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar \

                --class "${FILE_NAME}" \

                --conf "spark.ui.port=4040” \

                --conf "spark.driver.port=54631" \

                --conf "spark.fileserver.port=54731" \

                --conf "spark.blockManager.port=54832" \

                --conf "spark.kryoserializer.buffer.max=512" \

                ${JAR_FILE} \

                >> ${LOG_FILE}

*Note that in the above example I am only using modest resources. This is
intentional to ensure that resources are available for the other Spark jobs
that I may be testing on this standalone node.*

*Alternatively, you can specify some of these parameters when you are
creating a new SparkConf*

*val sparkConf = new SparkConf().*

*             setAppName("CEP_streaming").*

*             setMaster("local").*

*             Set(“num.executors”, “1”).*

*             set("spark.executor.memory", "2G").*

*             set(“spark.executor.cores”, “2”).*

*             set("spark.cores.max", "2").*

*             set("spark.driver.allowMultipleContexts", "true").*

*             set("spark.hadoop.validateOutputSpecs", "false")*

*You can practically run most of your unit testing with Local mode and
deploy variety of options including running SQL queries, reading data from
CSV files, writing to HDFS, creating Hive tables including ORC tables and
doing Spark Streaming.*

*The components of a Spark App*

*Although this may be of little relevance to Local mode, it would be
beneficial to clarify a number of Spark terminologies here.*

A Spark application consists of a driver program and a list of executors.
The driver program is the main program, which coordinates the executors to
run the Spark application. Executors are worker nodes' processes in charge
of running individual tasks in a given Spark job. The executors run the
tasks assigned by the driver program.  In Local mode, the driver program
runs inside the JVM and the driver program is running on the local machine.
There is only one executor and it is called *driver* and the tasks are
executed by the threads locally as well. This single executor will be
started with *k* threads.

Local mode is different than Standalone mode that uses Spark in-built
cluster set-up*.*

*Driver Program: *The driver is the process started by spark-submit. The
application relies on the initial Spark specific environment setting in the
shell that the application is started to create what is known as *SparkContext
object*. SparkContext tells the driver program how to access the Spark
cluster among other things. It is a separate Java process. It is identified
as *SparkSubmit* in jps

*Standalone Master* is not required in Local mode

*Standalone Worker* is not required in Local

*Executor *is the program that is launched on the Worker when a Job starts

*Tasks *Each Spark application is broken down into stages and each stage is
completed by one or more tasks. A task is a thread of execution that an
executor runs on a single node.

*Cache* is the memory allocated to this Spark process

Going back to Figure 1, we notice Cache, Executor and Tasks. These are as

Figure 2 below shows a typical Spark master URL. Note the number of cores
and Memory allocation for each worker. These are default maximum on his
host. Again these are the resource ceilings it does not mean that they go
and grab those values.

*Figure 2: A typical Spark master URL*

Note that as stated each worker grabs all the available cores and allocates
the remaining memory on each host. However, these values are somehow
misleading and are not updated. So I would not worry too much about what it
says in this page.

*Configuring Spark parameters*

To configure Spark shell parameters, you will need to modify the settings
in $SPARK_HOME/conf/ script

Note that the shells in $SPARK_HOME/sbin call $SPARK_HOME/conf/
scripts. So if you modify this file, remember to restart your master and
slaves’ routines

Every Spark executor in an application that has the same fixed number of
cores and same fixed heap size. The number of cores can be specified with
the --executor-cores flag when invoking spark-submit, spark-shell, and
pyspark from the command line, or by setting the spark.executor.cores
property in the spark-defaults.conf file or on a SparkConf object.
Similarly, the heap size can be controlled with the --executor-memory flag
or the spark.executor.memory property. The cores property controls the
number of concurrent tasks an executor can run*. **--executor-cores 5**
means that each executor can run a maximum of five tasks at the same time.*
The memory property impacts the amount of data Spark can cache, as well as
the maximum sizes of the shuffle data structures used for grouping,
aggregations, and joins.

*The **--num-executors** command-line flag or **spark.executor.instances**
configuration property control the number of executors requeste*d. Starting
in CDH 5.4/Spark 1.3, you will be able to avoid setting this property by
turning on dynamic allocation
with the spark.dynamicAllocation.enabled property. Dynamic allocation
enables a Spark application to request executors when there is a backlog of
pending tasks and free up executors when idle.

*Resource scheduling*

The standalone cluster mode currently only supports a simple First In First
Out (FIFO) scheduler across applications. Thus to allow multiple concurrent
users, you can control the maximum number of resources each application
will use.

By default, the default memory used in 512M. You can increase this value by
setting the following parameter in $SPARK_HOME/conf/ spark-defaults.conf

export spark.driver.memory         4g

or by supplying configuration setting at runtime to spark-shell or

*Note that in this mode a process will acquire all cores in the cluster,
which only makes sense if you just run one application at a time*.

You can cap the number of cores by setting spark.cores.max in your
SparkConf. For example:

  val conf = new SparkConf().



               set("spark.executor.memory", "4G").

               set("spark.cores.max", "2").

               set("spark.driver.allowMultipleContexts", "true")

  val sc = new SparkContext(conf)

Note that setMaster("local[2]"). Specifies that it is run locally with two


   local uses 1 thread.

   local[N] uses N threads.

   local[*] uses as many threads as there are cores.

However, since driver-memory setting encapsulates the JVM, you will need to
set the amount of driver memory for any non-default value *before starting
JVM by providing the new value:*

${SPARK_HOME}/bin/spark-shell --driver-memory 4g


${SPARK_HOME}/bin/spark-submit --driver-memory 4g

You can of course have a simple SparkConf values and set the additional
Spark configuration parameters at submit time


val sparkConf = new SparkConf().


*             setMaster("local[2]").*

             set("spark.streaming.concurrentJobs", "2").

             set("spark.driver.allowMultipleContexts", "true").

             set("spark.hadoop.validateOutputSpecs", "false")

And at submit time do

${SPARK_HOME}/bin/spark-submit \

                --master local[2] \

                --driver-memory 4G \

                --num-executors 1 \

                --executor-memory 4G \

                --executor-cores 2 \


Note that this will override earlier Spark configuration parameters with

*Resource Monitoring*

You can see the job progress in Spark Job GUI that by default runs on
This GUI has different tabs for Jobs, Stages, Executors etc. An example is
shown below:

*Figure 3: A typical Spark Job URL*

Figure 3 shows the status of Jobs. This is a simple job that uses JDBC to
access Oracle database and a table called dummy with 1 billion rows. It
then takes that table, caches it by registering it as temptable, create an
ORC table in Hive and populates that table. It was compiled using Maven and
executed through $SPARK_HOME/sbin/

The code is shown below: for ETL_scratchpad_dummy.scala

import org.apache.spark.SparkContext

import org.apache.spark.SparkConf

import org.apache.spark.sql.Row

import org.apache.spark.sql.hive.HiveContext

import org.apache.spark.sql.types._

import org.apache.spark.sql.SQLContext

import org.apache.spark.sql.functions._

object ETL_scratchpad_dummy {

  def main(args: Array[String]) {

  val conf = new SparkConf().



               set("spark.executor.memory", "4G").

               set("spark.cores.max", "2").

               set("spark.driver.allowMultipleContexts", "true")

  val sc = new SparkContext(conf)

  // Create sqlContext based on HiveContext

  val sqlContext = new HiveContext(sc)

  import sqlContext.implicits._

  val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

  println ("\nStarted at"); sqlContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy')

  HiveContext.sql("use oraclehadoop")

  var _ORACLEserver : String = "jdbc:oracle:thin:@rhes564:1521:mydb12"

  var _username : String = "scratchpad"

  var _password : String = "xxxxxx"

  // Get data from Oracle table scratchpad.dummy

  val d = HiveContext.load("jdbc",

  Map("url" -> _ORACLEserver,

  "dbtable" -> "(SELECT to_char(ID) AS ID, to_char(CLUSTERED) AS CLUSTERED,

  "user" -> _username,

  "password" -> _password))



  // Need to create and populate target ORC table oraclehadoop.dummy


  HiveContext.sql("use oraclehadoop")


  // Drop and create table dummy


  HiveContext.sql("DROP TABLE IF EXISTS oraclehadoop.dummy")

  var sqltext : String = ""

  sqltext = """

  CREATE TABLE oraclehadoop.dummy (

     ID INT
















  "orc.row.index.stride"="10000" )




  // Put data in Hive table. Clean up is already done


  sqltext = """

  INSERT INTO TABLE oraclehadoop.dummy



        , CLUSTERED

        , SCATTERED

        , RANDOMISED


        , SMALL_VC

        , PADDING

  FROM tmp



  println ("\nFinished at"); sqlContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy')




If you look at Figure 3 you will the status of the job broken into *Active
Jobs* and *Completed Jobs *respectively. The description is pretty smart.
It tells you which line of code was executed. For example “collect at
ETL_scratchpad_dummy.scala:24” refers to line 24 of the code which is below:

println ("\nStarted at"); sqlContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy')

This Job (Job Id 0) is already completed

On the other hand Active Job Id 1 “sql at ETL_scratchpad_dummy.scala:87” is
currently running at line 87 of the code which is

  sqltext = """

  INSERT INTO TABLE oraclehadoop.dummy



        , CLUSTERED

        , SCATTERED

        , RANDOMISED


        , SMALL_VC

        , PADDING

  FROM tmp



We can look at this job further by looking at the active job session in GUI
though stages

*Figure 4: Drilling down to execution*



Dr Mich Talebzadeh

LinkedIn *

*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.

On 21 July 2016 at 12:27, Joaquin Alzola <> wrote:

> You have the same as link 1 but in English?
>    - spark-questions-concepts
>    <>
>    - deep-into-spark-exection-model
>    <>
> Seems really interesting post but in Chinese. I suppose google translate
> suck on the translation.
> *From:* Taotao.Li []
> *Sent:* 21 July 2016 04:04
> *To:* Jean Georges Perrin <>
> *Cc:* Sachin Mittal <>; user <>
> *Subject:* Re: Understanding spark concepts cluster, master, slave, job,
> stage, worker, executor, task
> Hi, Sachin,  here are two posts about the basic concepts about spark:
>    - spark-questions-concepts
>    <>
>    - deep-into-spark-exection-model
>    <>
> And, I fully recommend databrick's post:
> On Thu, Jul 21, 2016 at 1:36 AM, Jean Georges Perrin <> wrote:
> Hey,
> I love when questions are numbered, it's easier :)
> 1) Yes (but I am not an expert)
> 2) You don't control... One of my process is going to 8k tasks, so...
> 3) Yes, if you have HT, it double. My servers have 12 cores, but HT, so it
> makes 24.
> 4) From my understanding: Slave is the logical computational unit and
> Worker is really the one doing the job.
> 5) Dunnoh
> 6) Dunnoh
> On Jul 20, 2016, at 1:30 PM, Sachin Mittal <> wrote:
> Hi,
> I was able to build and run my spark application via spark submit.
> I have understood some of the concepts by going through the resources at
> but few doubts still remain. I have few specific
> questions and would be glad if someone could share some light on it.
> So I submitted the application using spark.master    local[*] and I have a
> 8 core PC.
> - What I understand is that application is called as job. Since mine had
> two stages it gets divided into 2 stages and each stage had number of tasks
> which ran in parallel.
> Is this understanding correct.
> - What I notice is that each stage is further divided into 262 tasks From
> where did this number 262 came from. Is this configurable. Would increasing
> this number improve performance.
> - Also I see that the tasks are run in parallel in set of 8. Is this
> because I have a 8 core PC.
> - What is the difference or relation between slave and worker. When I did
> spark-submit did it start 8 slaves or worker threads?
> - I see all worker threads running in one single JVM. Is this because I
> did not start  slaves separately and connect it to a single master cluster
> manager. If I had done that then each worker would have run in its own JVM.
> - What is the relationship between worker and executor. Can a worker have
> more than one executors? If yes then how do we configure that. Does all
> executor run in the worker JVM and are independent threads.
> I suppose that is all for now. Would appreciate any response.Will add
> followup questions if any.
> Thanks
> Sachin
> --
> *___________________*
> Quant | Engineer | Boy
> *___________________*
> *blog*:
> <>
> *github*:
> This email is confidential and may be subject to privilege. If you are not
> the intended recipient, please do not copy or disclose its content but
> contact the sender immediately upon receipt.

View raw message