spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Re: Understanding spark concepts cluster, master, slave, job, stage, worker, executor, task
Date Fri, 22 Jul 2016 13:29:50 GMT
. Further I see some env variables like SPARK_EXECUTOR_INSTANCES,
SPARK_EXECUTOR_MEMORY
and SPARK_EXECUTOR_CORES
If we set this in the env where we start our worker means by default all
the job submitted take the values specified by these. Are these options
applicable in standalone mode.


Chech file  $SPARK_HOME/sbin/spark-env.sh under standalone section

# Options for the daemons used in the standalone deploy mode
# - SPARK_MASTER_IP, to bind the master to a different IP address or
hostname
# - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports
for the master
# - SPARK_MASTER_OPTS, to set config properties only for the master (e.g.
"-Dx=y")
export SPARK_WORKER_CORES=2 ##, to set the number of cores to use on this
machine
export SPARK_WORKER_MEMORY=2g ##, to set how much total memory workers have
to give executors (e.g. 1000m, 2g)
# - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT, to use non-default ports
for the worker
export SPARK_WORKER_INSTANCES=4 ##, to set the number of worker processes
per node
# - SPARK_WORKER_DIR, to set the working directory of worker processes
# - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g.
"-Dx=y")
# - SPARK_DAEMON_MEMORY, to allocate to the master, worker and history
server themselves (default: 1g).
# - SPARK_HISTORY_OPTS, to set config properties only for the history
server (e.g. "-Dx=y")
# - SPARK_SHUFFLE_OPTS, to set config properties only for the external
shuffle service (e.g. "-Dx=y")
# - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g.
"-Dx=y")
# - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers
# Generic options for the daemons used in the standalone deploy mode
# - SPARK_CONF_DIR      Alternate conf dir. (Default: ${SPARK_HOME}/conf)
# - SPARK_LOG_DIR       Where log files are stored.  (Default:
${SPARK_HOME}/logs)
# - SPARK_PID_DIR       Where the pid file is stored. (Default: /tmp)
# - SPARK_IDENT_STRING  A string representing this instance of spark.
(Default: $USER)
# - SPARK_NICENESS      The scheduling priority for daemons. (Default: 0)


HTH




Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 22 July 2016 at 10:57, Sivakumaran S <siva.kumaran@me.com> wrote:

> 1.  the --deploy-mode dictates whether to launch the driver program
> locally (“client”) in the machine where the spark-submit runs or or on one
> of the machines in the cluster. Consider “cluster” option to be outsourcing
> the driver program to a different node in the cluster. Thus with the
> cluster mode, you don’t have to keep your local machine open or logged in
> because the driver program is running elsewhere.
>
> 2.. If I am not wrong ( I vaguely remember from the book), specifying a
> executor-memory requirement higher than what is physically available causes
> the job to be fail.
>
>
> HTH
>
> Regards,
>
> Sivakumaran
>
> On 22-Jul-2016, at 10:45 AM, Sachin Mittal <sjmittal@gmail.com> wrote:
>
> Hi All,
> This has helped me a lot. especially the differences between local and
> standalone mode.
> In local everything is within the same JVM and using local[*] you can
> control number of cores/threads to use to run the job/tasks.
>
> In standalone mode you start master/ one more slaves (workers). Then you
> use spark-submit or job's conf to control number of executors, cores per
> executors, memory per executors when these get distributed to a worker. A
> worker can have one more executor task running depending on number of jobs
> and job's executor configurations.
>
> Few things are still not clear to me (all on cluster mode
> standalone/otherwise)
>
> 1. spark-submit or driver program has a deploy-mode option as
> client/cluster.
>     I have not understood as what is the purpose of master here, when the
> main driver program does not run there. ie it is either running locally as
> a client or on cluster. So basically what is the purpose of driver program
> and what is the purpose of master.
>
>
> 2. The three options using which one controls the executor for a job.
> --num-executors 6
> --executor-memory 2G
> --executor-cores 2
> So say I set the above options for a job. What if my worker node has only
> 8 cores available and total of 8 GB RAM available.
> Clearly as per this configuration it will run 6 executors on each node and
> take 6 * 2 = 12 cores and 6 * 2 = 12 GB memory.
> This exceeds the physical capacity of that worker node. So what will
> happen in this case.
>
>
> 3. Also what is advisable setting --num-executors and --executor-cores
> options or --total-executor-cores option to control the cores job should
> use.
>
>
> 4. Further I see some env variables like SPARK_EXECUTOR_INSTANCES,  SPARK_EXECUTOR_MEMORY
> and SPARK_EXECUTOR_CORES
> If we set this in the env where we start our worker means by default all
> the job submitted take the values specified by these. Are these options
> applicable in standalone mode.
>
>
> Thanks
> Sachin
>
>
> On Fri, Jul 22, 2016 at 12:38 PM, Mich Talebzadeh <
> mich.talebzadeh@gmail.com> wrote:
>
>> Your points:
>>
>> Here are some followup questions:
>> 1. In option spark-submit --num-executors 2 --master local[*]
>> Say I have 8 core PC, then will there be 2 executor per worker and 8
>> workers, ie 16 threads overall or
>> just 2 threads overall?
>>
>>
>> In this mode"Local", the driver program (SparkSubmit), the resource
>> manager and executor all exist within the same JVM. The JVM itself is the
>> worker thread.
>> There is only one driver, one executor and 1 JVM all in one. You cannot
>> have two executors. As I said before in Local mode you don't have master
>> and slaves (AKA workers). You can have more than one core, meaning that
>> the same code can run on sub-section of data in parallel if applicable.
>> With 2 cores you can have two tasks, with 8 cores you can have 8 tasks
>>
>> If you have an 8 core PC meaning 8 logical processors/threads,  then
>> --master local[8] or --master local[*] will use 9 cores. --master local[2]
>> will use two cores
>>
>> 2. What is driver-memory. Is this memory to be allocated to the master?
>>
>> The one allocated to your job spark-submit. That is not master. Remember
>> a driver needs to be there for executing and coordinating your spark app. A
>> master needs to be there when running spark in "standalone" mode
>>
>> 3. If executor-memory 1G and num-executors 2 and executor-cores 2
>> Does this mean that my worker node will utilize 2G of memory (2 x 1) and
>> 4 cores (out of say 8 core PC)
>>
>> Now with num-executors we are talking about anything else rather than
>> Local mode say standalone mode. executor-memory is memory allocated to
>> "each" executor. number-executors is telling that this job
>> (spark-submit) will use two executors. Executor-cores=2 means that each
>> executor can perform parallel queries (tasks)on two sub-set of data at the
>> same time. Each executor runs on one worker and a worker can have more than
>> one executor running on it. You start workers by running
>> sbin/start-slaves.sh
>>
>> HTH
>>
>>
>>
>> So when tuning num-executors * executor-cores < total avaibale cores on
>> that machine
>> and executor-memory * num-executors < total available memory on that
>> machine
>> Is that assumption correct
>>
>> 3. what is the difference between SPARK_WORKER_CORES and executor-cores
>>
>> 4. I have not understood the deploy-mode option and concept of driver
>> program
>> Since we submit the application to master, shouldn't the driver program
>> always run on master.
>> If not that what is the difference between master and driver.
>>
>> Dr Mich Talebzadeh
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 22 July 2016 at 06:29, Sachin Jain <sachinjain024@gmail.com> wrote:
>>
>>> Hi Sachin Mittal,
>>>
>>> I would like answer this one (if not all)
>>>
>>> > 4. By default each worker thread has one executor which will run the
>>> task assigned to it.
>>>
>>> Yes, By default Each worker thread starts one executor to run the task.
>>> A worker can have more than 1 executors but there can be only one executor
>>> per job.
>>> Suppose you are running 2 Spark jobs on your cluster then it is possible
>>> that a worker W1 has two executors E1 and E2 for Job J1 and J2 respectively.
>>>
>>> Hope it helps!!
>>>
>>> On Fri, Jul 22, 2016 at 10:54 AM, Sachin Mittal <sjmittal@gmail.com>
>>> wrote:
>>>
>>>> Hi All,
>>>> Thanks for the links and clearing some of the concepts.
>>>> I have read the links and resources and have developed my understanding
>>>> of spark.
>>>> I am just presenting my understanding, please confirm if I am on right
>>>> track.
>>>>
>>>> 1. spark-submit simple configuration runs spark in local mode.
>>>> 2. In local mode everything runs under one jvm.
>>>> 3. --master local[*] configuration will start worker threads in that
>>>> JVM as per * option.
>>>> 4. By default each worker thread has one executor which will run the
>>>> task assigned to it.
>>>> 5. In standalone mode you can create a cluster by starting a master and
>>>> starting one more many slaves.
>>>> 6. Slaves can be local to the master or run on a different PC accesible
>>>> from master PC.
>>>> 7. A slave is worker node and this node can have one or more executors.
>>>> 8. This can be configured using --num-executors and each executor will
>>>> run in its own thread.
>>>> 9. So say I have 3 workers and on each worker I can run say 7 executors
>>>> means I can run 21 tasks in parallel.
>>>> 10. In standalone mode I can submit a job directly to the master
>>>> cluster and it will take care of dividing the job/tasks to worker nodes.
>>>>     Way to do it would be spark-submit --master spark://<spark master
>>>> ip>:7077
>>>>
>>>>
>>>> I also have some followup questions. Basically I have not understood
>>>> that if you decide how many executors a worker (or slave if worker <=>
>>>> slave in standalone mode is correct understanding), will have when starting
>>>> the worker, or if that decision is made via spark-submit ie when submitting
>>>> the job.
>>>>
>>>> Here are some followup questions:
>>>> 1. In option spark-submit --num-executors 2 --master local[*]
>>>> Say I have 8 core PC, then will there be 2 executor per worker and 8
>>>> workers, ie 16 threads overall or
>>>> just 2 threads overall?
>>>>
>>>> 2. What is driver-memory. Is this memory to be allocated to the master?
>>>>
>>>> 3. If executor-memory 1G and num-executors 2 and executor-cores 2
>>>> Does this mean that my worker node will utilize 2G of memory (2 x 1)
>>>> and 4 cores (out of say 8 core PC)
>>>>
>>>> So when tuning num-executors * executor-cores < total avaibale cores on
>>>> that machine
>>>> and executor-memory * num-executors < total available memory on that
>>>> machine
>>>> Is that assumption correct
>>>>
>>>> 3. what is the difference between SPARK_WORKER_CORES and executor-cores
>>>>
>>>> 4. I have not understood the deploy-mode option and concept of driver
>>>> program
>>>> Since we submit the application to master, shouldn't the driver program
>>>> always run on master.
>>>> If not that what is the difference between master and driver.
>>>>
>>>> It would be great if anyone of you can shed some light on these topics.
>>>>
>>>> Thanks again
>>>> Sachin
>>>>
>>>>
>>>> On Thu, Jul 21, 2016 at 5:26 PM, Mich Talebzadeh <
>>>> mich.talebzadeh@gmail.com> wrote:
>>>>
>>>>> I started putting together some Performance and Tuning guide for Spark
>>>>> starting from the simplest operation Local and Standalone modes but sounds
>>>>> like I never have the time to finish it!
>>>>>
>>>>> This is some stuff but is in word and wrapped together in some
>>>>> arbitrary way.  Anyway if you think it is useful let me know and I try to
>>>>> finish it :)
>>>>>
>>>>> Some of the points we have already discussed in this user group or
>>>>> part of wider available literature. It is aimed at practitioner.
>>>>>
>>>>> *Introduction*
>>>>> According to Spark Website, Apache Spark <http://spark.apache.org/> is
>>>>> a fast and general purpose engine for large-scale data processing. It is
>>>>> written mostly in Scala, and provides APIs for Scala, Java and Python. It
>>>>> is fully compatible with Hadoop Distributed File System (HDFS), however it
>>>>> extends on Hadoop’s core functionality by providing in-memory cluster
>>>>> computation among other things
>>>>> Providing in-memory capabilities is probably one of the most import
>>>>> aspects of Spark that allows one to do computation in-memory. It also
>>>>> supports an advanced scheduler based on directed acyclic graph (DAG
>>>>> <https://en.wikipedia.org/wiki/Directed_acyclic_graph>). These
>>>>> capabilities allow Spark to be used as an advanced query engine with the
>>>>> help of Spark shell and Spark SQL. For near real time data processing Spark
>>>>> Streaming can be used. Another important but often understated capability
>>>>> of Spark is deploying it to be used as an advanced execution engine for
>>>>> other Hadoop tools such as Hive.
>>>>> Like most of the tools in Hadoop ecosystem, Spark will require careful
>>>>> tuning to get the most out of it.
>>>>> Thus, in these brief notes we will aim to address these points to
>>>>> ensure that you create an infrastructure for Spark, which is not only
>>>>> performant but also scalable for your needs.
>>>>> *Why Spark*
>>>>> The Hadoop ecosystem is nowadays crowded with a variety of offerings.
>>>>> Some of them are complementary and others are competing with each other.
>>>>> Spark is unique in that in a space of relatively short time it has grown
>>>>> much in its popularity and as of today is one of the most popular tools in
>>>>> the Hadoop ecosystem.
>>>>> The fundamental technology of Hadoop using Map-Reduce algorithm as its
>>>>> core execution engine gave rise to deployment of other methods. Although
>>>>> Map-Reduce was and still is an incredible technology, it lacked the speed
>>>>> and performance required for certain business needs like dealing with
>>>>> real-time analytics. Spark was developed ground up to address these
>>>>> concerns.
>>>>>
>>>>>
>>>>> *Overview of Spark Architecture*
>>>>>
>>>>> Spark much like many other tools runs a set of instructions summarized
>>>>> in the form of an application. An application consists of a Driver Program
>>>>> that is responsible for submitting, running and monitoring the code.
>>>>>
>>>>>
>>>>>  Spark can distribute the work load across what is known as cluster.
>>>>> In other words, Spark applications run as independent sets of
>>>>> processes on a cluster. Process is an application running on UNIX/Linux
>>>>> system
>>>>>
>>>>>
>>>>> A *cluster *is a collection of servers, called nodes that communicate
>>>>> with each other to make a set of services highly available to the
>>>>> applications running on them.
>>>>>
>>>>>
>>>>> Before going any further one can equate a node with a physical host, a
>>>>> VM host or any other resource capable of providing RAM and core. Some refer
>>>>> to nodes as machines as well.
>>>>>
>>>>>
>>>>> *Spark Operations*
>>>>>
>>>>> Spark takes advantages of a cluster by dividing the workload across
>>>>> this cluster and executing operations in parallel to speed up the
>>>>> processing. To affect this Spark provides *as part of its core
>>>>> architecture *an abstraction layer called a *Resilient Distributed
>>>>> Dataset (RDD). *Simply put an RDD is a selection of elements (like a
>>>>> sequence, a text file, a CSV file, data coming in from streaming sources
>>>>> like Twitter, Kafka and so forth) that one wants to work with.
>>>>>
>>>>>
>>>>> What an RDD does is to partition that data across the nodes of the
>>>>> Spark cluster to take advantage of parallel processing.
>>>>>
>>>>>
>>>>> *RDDs in Spark are immutable *meaning that *they cannot be changed,
>>>>> but can be acted upon* to create other RDDs and result sets. RDDs can
>>>>> also be cached in memory (to be precise in the memory allocated to Spark)
>>>>> across multiple nodes for faster parallel operations.
>>>>>
>>>>>
>>>>> So that takes care of data. There is a second abstraction in Spark
>>>>> known as *shared variables* that can be used in parallel operations.
>>>>> By default, when Spark runs a function in parallel as a set of tasks on
>>>>> different nodes, it ships a copy of each variable used in the function to
>>>>> each task. Sometimes, a variable need to be shared across tasks, or between
>>>>> tasks and the driver program. *Spark supports two types of shared
>>>>> variables*: *broadcast variables*, which can be used to cache a value
>>>>> in memory on all nodes, and *accumulators*, which are variables that
>>>>> are only “added” to, such as counters and sums. We will cover them later
>>>>>
>>>>>
>>>>> We have been mentioning Spark clusters but clusters can be configured
>>>>> differently as well through what is known as the configuration of cluster
>>>>> manager. To this effect, Spark currently supports the following
>>>>> configurations:
>>>>>
>>>>>    - *Spark Local* - Spark runs on the local host. This is the
>>>>>    simplest set up and best suited for learners who want to understand
>>>>>    different concepts of Spark and those performing unit testing.
>>>>>    - *Spark Standalone *– a simple cluster manager included with
>>>>>    Spark that makes it easy to set up a cluster.
>>>>>    - *YARN Cluster Mode,* the Spark driver runs inside an application
>>>>>    master process which is managed by YARN on the cluster, and the client can
>>>>>    go away after initiating the application. This is invoked with –master
>>>>>    yarn and --deploy-mode cluster
>>>>>    - *YARN Client Mode*, the driver runs in the client process, and
>>>>>    the application master is only used for requesting resources from YARN.
>>>>>    Unlike Spark standalone mode, in which the master’s address is
>>>>>    specified in the --master parameter, in YARN mode the
>>>>>    ResourceManager’s address is picked up from the Hadoop configuration. Thus,
>>>>>    the --master parameter is yarn. This is invoked with --deploy-mode
>>>>>    client
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *- client mode requires the process that launched the app remain
>>>>> alive. Meaning the host where it lives has to stay alive, and it may not be
>>>>> super-friendly to ssh sessions dying, for example, unless you use nohup. -
>>>>> client mode driver logs are printed to stderr by default. yes you can
>>>>> change that, but in cluster mode, they're all collected by yarn without any
>>>>> user intervention. - if your edge node (from where the app is launched)
>>>>> isn't really part of the cluster (e.g., lives in an outside network with
>>>>> firewalls or higher latency), you may run into issues. - in cluster mode,
>>>>> your driver's cpu / memory usage is accounted for in YARN; this matters if
>>>>> your edge node is part of the cluster (and could be running yarn
>>>>> containers), since in client mode your driver will potentially use a lot of
>>>>> memory / cpu. - finally, in cluster mode YARN can restart your application
>>>>> without user interference. this is useful for things that need to stay up
>>>>> (think a long running streaming job, for example).*
>>>>> *If your client is not close to the cluster (e.g. your PC) then you
>>>>> definitely want to go cluster to improve performance. If your client is
>>>>> close to the cluster (e.g. an edge node) then you could go either client or
>>>>> cluster.  Note that by going client, more resources are going to be used on
>>>>> the edge node.*
>>>>> In this part one, we will confine ourselves with *Spark on Local host
>>>>> *and will leave the other two to other parts.
>>>>> *Spark Local Mode*
>>>>> Spark Local Mode is the simplest configuration of Spark that does not
>>>>> require a Cluster. The user on the local host can launch and experiment
>>>>> with Spark.
>>>>>
>>>>>
>>>>> In this mode the driver program (SparkSubmit), the resource manager
>>>>> and executor all exist within the same JVM. The JVM itself is the worker
>>>>> thread
>>>>> When you use spark-shell or for that matter spark-sql, you are staring
>>>>> spark-submit under the bonnet. These two shells are created to make life
>>>>> easier to work on Spark.
>>>>>
>>>>>
>>>>> However, if you look at what $SPARK_HOME/bin/spark-shell does in the
>>>>> script, you will notice my point:
>>>>>
>>>>>
>>>>> "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main
>>>>> --name "Spark shell" "$@"
>>>>>
>>>>>
>>>>> So that is basically spark-submit JVM invoked with the name "Spark
>>>>> shell"
>>>>>
>>>>>
>>>>> Since it is using spark-submit it takes all the parameters related to
>>>>> spark-submit. However, remember that these two shells are created for
>>>>> read–eval–print loop (REPL) and they default to Local mode. You cannot use
>>>>> them for example in YARN cluster mode.
>>>>>
>>>>>
>>>>> Some default parameters can be changed. For example, the default Web
>>>>> GUI for Spark is 4040. However, I start it with 55555 and modified it to
>>>>> call it a different name
>>>>>
>>>>>
>>>>> "${SPARK_HOME}"/bin/spark-submit --conf "spark.ui.port=55555" --class
>>>>> org.apache.spark.repl.Main --name "my own Spark shell" "$@"
>>>>>
>>>>> Before going further let us understand the concept of cores and
>>>>> threads. These days we talk about cores more than CPUs. Each CPU comes with
>>>>> a number of cores.
>>>>> Simply put to work out the number of threads you can do this:
>>>>>
>>>>> cat /proc/cpuinfo|grep processor|wc -l
>>>>>
>>>>>  Which for me it returns 12 and that is all I need to know without
>>>>> worrying what physical cores, logical cores and CPU really mean as these
>>>>> definitions may vary from one hardware vendor to another.
>>>>> On local mode with you have
>>>>> --master local
>>>>>
>>>>>
>>>>> This will start with one (worker) *thread *or equivalent to –master
>>>>> local[1]. You can start by more than one thread by specifying the number of
>>>>> threads *k* in –master local[k]. You can also start using all
>>>>> available threads with –master local[*]. The degree of parallelism is
>>>>> defined by the number of threads *k*.
>>>>> In *Local mode*, you do not need to start master and slaves/workers.
>>>>> In this mode it is pretty simple and you can run as many JVMs
>>>>> (spark-submit) as your resources allow (resource meaning memory and cores).
>>>>> Additionally, the GUI starts by default on port 4040, next one on 4041 and
>>>>> so forth unless you specifically start it with --conf
>>>>> "spark.ui.port=nnnnn"
>>>>> Remember this is all about testing your apps. It is NOT a performance
>>>>> test. What it allows you is to test multiple apps concurrently and more
>>>>> importantly gets you started and understand various configuration
>>>>> parameters that Spark uses together with spark-submit executable
>>>>> You can of course use spark-shell and spark-sql utilities. These in
>>>>> turn rely on spark-submit executable to run certain variations of the JVM.
>>>>> In other words, you are still executing spark-submit. You can pass
>>>>> parameters to spark-submit with an example shown below:
>>>>> ${SPARK_HOME}/bin/spark-submit \
>>>>>                 --packages com.databricks:spark-csv_2.11:1.3.0 \
>>>>>                 --driver-memory 2G \
>>>>>                 --num-executors 1 \
>>>>>                 --executor-memory 2G \
>>>>>                 --master local \
>>>>>                 --executor-cores 2 \
>>>>>                 --conf "spark.scheduler.mode=FAIR" \
>>>>>                 --conf
>>>>> "spark.executor.extraJavaOptions=-XX:+PrintGCDetails
>>>>> -XX:+PrintGCTimeStamps" \
>>>>>                 --jars
>>>>> /home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar \
>>>>>                 --class "${FILE_NAME}" \
>>>>>                 --conf "spark.ui.port=4040” \
>>>>>                 --conf "spark.driver.port=54631" \
>>>>>                 --conf "spark.fileserver.port=54731" \
>>>>>                 --conf "spark.blockManager.port=54832" \
>>>>>                 --conf "spark.kryoserializer.buffer.max=512" \
>>>>>                 ${JAR_FILE} \
>>>>>                 >> ${LOG_FILE}
>>>>>
>>>>> *Note that in the above example I am only using modest resources. This
>>>>> is intentional to ensure that resources are available for the other Spark
>>>>> jobs that I may be testing on this standalone node.*
>>>>>
>>>>> *Alternatively, you can specify some of these parameters when you are
>>>>> creating a new SparkConf*
>>>>>
>>>>> *val sparkConf = new SparkConf().*
>>>>>
>>>>> *             setAppName("CEP_streaming").*
>>>>>
>>>>> *             setMaster("local").*
>>>>>
>>>>> *             Set(“num.executors”, “1”).*
>>>>>
>>>>> *             set("spark.executor.memory", "2G").*
>>>>>
>>>>> *             set(“spark.executor.cores”, “2”).*
>>>>>
>>>>> *             set("spark.cores.max", "2").*
>>>>>
>>>>> *             set("spark.driver.allowMultipleContexts", "true").*
>>>>>
>>>>> *             set("spark.hadoop.validateOutputSpecs", "false")*
>>>>>
>>>>>
>>>>>
>>>>> *You can practically run most of your unit testing with Local mode and
>>>>> deploy variety of options including running SQL queries, reading data from
>>>>> CSV files, writing to HDFS, creating Hive tables including ORC tables and
>>>>> doing Spark Streaming.*
>>>>>
>>>>> *The components of a Spark App*
>>>>>
>>>>> *Although this may be of little relevance to Local mode, it would be
>>>>> beneficial to clarify a number of Spark terminologies here.*
>>>>>
>>>>> A Spark application consists of a driver program and a list of
>>>>> executors. The driver program is the main program, which coordinates the
>>>>> executors to run the Spark application. Executors are worker nodes'
>>>>> processes in charge of running individual tasks in a given Spark job. The
>>>>> executors run the tasks assigned by the driver program.  In Local
>>>>> mode, the driver program runs inside the JVM and the driver program is
>>>>> running on the local machine. There is only one executor and it is called
>>>>> *driver* and the tasks are executed by the threads locally as well.
>>>>> This single executor will be started with *k* threads.
>>>>>
>>>>> Local mode is different than Standalone mode that uses Spark in-built
>>>>> cluster set-up*.*
>>>>>
>>>>> *Driver Program: *The driver is the process started by spark-submit.
>>>>> The application relies on the initial Spark specific environment setting in
>>>>> the shell that the application is started to create what is known as *SparkContext
>>>>> object*. SparkContext tells the driver program how to access the
>>>>> Spark cluster among other things. It is a separate Java process. It is
>>>>> identified as *SparkSubmit* in jps
>>>>>
>>>>> *Standalone Master* is not required in Local mode
>>>>>
>>>>> *Standalone Worker* is not required in Local
>>>>> *Executor *is the program that is launched on the Worker when a Job
>>>>> starts executing
>>>>> *Tasks *Each Spark application is broken down into stages and each
>>>>> stage is completed by one or more tasks. A task is a thread of execution
>>>>> that an executor runs on a single node.
>>>>> *Cache* is the memory allocated to this Spark process
>>>>>
>>>>>
>>>>> Going back to Figure 1, we notice Cache, Executor and Tasks. These are
>>>>> as follows:
>>>>> Figure 2 below shows a typical Spark master URL. Note the number of
>>>>> cores and Memory allocation for each worker. These are default maximum on
>>>>> his host. Again these are the resource ceilings it does not mean that they
>>>>> go and grab those values.
>>>>>
>>>>> *Figure 2: A typical Spark master URL*
>>>>>
>>>>>
>>>>> Note that as stated each worker grabs all the available cores and
>>>>> allocates the remaining memory on each host. However, these values are
>>>>> somehow misleading and are not updated. So I would not worry too much about
>>>>> what it says in this page.
>>>>>
>>>>>
>>>>> *Configuring Spark parameters*
>>>>> To configure Spark shell parameters, you will need to modify the
>>>>> settings in $SPARK_HOME/conf/spark-env.sh script
>>>>> Note that the shells in $SPARK_HOME/sbin call
>>>>> $SPARK_HOME/conf/spark-env.sh scripts. So if you modify this file, remember
>>>>> to restart your master and slaves’ routines
>>>>>
>>>>> Every Spark executor in an application that has the same fixed number
>>>>> of cores and same fixed heap size. The number of cores can be specified
>>>>> with the --executor-cores flag when invoking spark-submit,
>>>>> spark-shell, and pyspark from the command line, or by setting the
>>>>> spark.executor.cores property in the spark-defaults.conf file or on a
>>>>> SparkConf object. Similarly, the heap size can be controlled with the
>>>>> --executor-memory flag or the spark.executor.memory property. The
>>>>> cores property controls the number of concurrent tasks an executor
>>>>> can run*. **--executor-cores 5** means that each executor can run a
>>>>> maximum of five tasks at the same time.* The memory property impacts
>>>>> the amount of data Spark can cache, as well as the maximum sizes of the
>>>>> shuffle data structures used for grouping, aggregations, and joins.
>>>>>
>>>>> *The **--num-executors** command-line flag or *
>>>>> *spark.executor.instances** configuration property control the number
>>>>> of executors requeste*d. Starting in CDH 5.4/Spark 1.3, you will be
>>>>> able to avoid setting this property by turning on dynamic allocation
>>>>> <https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation>
>>>>> with the spark.dynamicAllocation.enabled property. Dynamic allocation
>>>>> enables a Spark application to request executors when there is a backlog of
>>>>> pending tasks and free up executors when idle.
>>>>>
>>>>>
>>>>> *Resource scheduling*
>>>>> The standalone cluster mode currently only supports a simple First In
>>>>> First Out (FIFO) scheduler across applications. Thus to allow multiple
>>>>> concurrent users, you can control the maximum number of resources each
>>>>> application will use.
>>>>> By default, the default memory used in 512M. You can increase this
>>>>> value by setting the following parameter in $SPARK_HOME/conf/
>>>>> spark-defaults.conf
>>>>> export spark.driver.memory         4g
>>>>> or by supplying configuration setting at runtime to spark-shell or
>>>>> spark-submit
>>>>> *Note that in this mode a process will acquire all cores in the
>>>>> cluster, which only makes sense if you just run one application at a time*.
>>>>>
>>>>> You can cap the number of cores by setting spark.cores.max in your
>>>>> SparkConf. For example:
>>>>>
>>>>>   val conf = new SparkConf().
>>>>>
>>>>>                setAppName("MyApplication").
>>>>>
>>>>>                setMaster("local[2]").
>>>>>
>>>>>                set("spark.executor.memory", "4G").
>>>>>
>>>>>                set("spark.cores.max", "2").
>>>>>
>>>>>                set("spark.driver.allowMultipleContexts", "true")
>>>>>
>>>>>   val sc = new SparkContext(conf)
>>>>>
>>>>>
>>>>> Note that setMaster("local[2]"). Specifies that it is run locally with
>>>>> two threads
>>>>>
>>>>>    - local uses 1 thread.
>>>>>    - local[N] uses N threads.
>>>>>    - local[*] uses as many threads as there are cores.
>>>>>
>>>>>
>>>>> However, since driver-memory setting encapsulates the JVM, you will
>>>>> need to set the amount of driver memory for any non-default value *before
>>>>> starting JVM by providing the new value:*
>>>>>
>>>>> ${SPARK_HOME}/bin/spark-shell --driver-memory 4g
>>>>>
>>>>> Or
>>>>>
>>>>> ${SPARK_HOME}/bin/spark-submit --driver-memory 4g
>>>>>
>>>>>
>>>>> You can of course have a simple SparkConf values and set the
>>>>> additional Spark configuration parameters at submit time
>>>>>
>>>>>
>>>>> Example
>>>>>
>>>>>
>>>>> val sparkConf = new SparkConf().
>>>>>
>>>>>              setAppName("CEP_streaming").
>>>>>
>>>>> *             setMaster("local[2]").*
>>>>>
>>>>>              set("spark.streaming.concurrentJobs", "2").
>>>>>
>>>>>              set("spark.driver.allowMultipleContexts", "true").
>>>>>
>>>>>              set("spark.hadoop.validateOutputSpecs", "false")
>>>>>
>>>>>
>>>>> And at submit time do
>>>>>
>>>>>
>>>>> ${SPARK_HOME}/bin/spark-submit \
>>>>>
>>>>>                 --master local[2] \
>>>>>
>>>>>                 --driver-memory 4G \
>>>>>
>>>>>                 --num-executors 1 \
>>>>>
>>>>>                 --executor-memory 4G \
>>>>>
>>>>>                 --executor-cores 2 \
>>>>>
>>>>>                 …..
>>>>>
>>>>> Note that this will override earlier Spark configuration parameters
>>>>> with sparkConf
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *Resource Monitoring*
>>>>>
>>>>> You can see the job progress in Spark Job GUI that by default runs on <HOST>:4040.
>>>>> This GUI has different tabs for Jobs, Stages, Executors etc. An example is
>>>>> shown below:
>>>>>
>>>>>
>>>>> *Figure 3: A typical Spark Job URL*
>>>>>
>>>>>
>>>>> Figure 3 shows the status of Jobs. This is a simple job that uses JDBC
>>>>> to access Oracle database and a table called dummy with 1 billion rows. It
>>>>> then takes that table, caches it by registering it as temptable, create an
>>>>> ORC table in Hive and populates that table. It was compiled using Maven and
>>>>> executed through $SPARK_HOME/sbin/spark-submit.sh
>>>>>
>>>>>
>>>>> The code is shown below: for ETL_scratchpad_dummy.scala
>>>>>
>>>>> import org.apache.spark.SparkContext
>>>>>
>>>>> import org.apache.spark.SparkConf
>>>>>
>>>>> import org.apache.spark.sql.Row
>>>>>
>>>>> import org.apache.spark.sql.hive.HiveContext
>>>>>
>>>>> import org.apache.spark.sql.types._
>>>>>
>>>>> import org.apache.spark.sql.SQLContext
>>>>>
>>>>> import org.apache.spark.sql.functions._
>>>>>
>>>>> object ETL_scratchpad_dummy {
>>>>>
>>>>>   def main(args: Array[String]) {
>>>>>
>>>>>   val conf = new SparkConf().
>>>>>
>>>>>                setAppName("ETL_scratchpad_dummy").
>>>>>
>>>>>                setMaster("local[2]").
>>>>>
>>>>>                set("spark.executor.memory", "4G").
>>>>>
>>>>>                set("spark.cores.max", "2").
>>>>>
>>>>>                set("spark.driver.allowMultipleContexts", "true")
>>>>>
>>>>>   val sc = new SparkContext(conf)
>>>>>
>>>>>   // Create sqlContext based on HiveContext
>>>>>
>>>>>   val sqlContext = new HiveContext(sc)
>>>>>
>>>>>   import sqlContext.implicits._
>>>>>
>>>>>   val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>>>
>>>>>   println ("\nStarted at"); sqlContext.sql("SELECT
>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>>> ").collect.foreach(println)
>>>>>
>>>>>
>>>>>   HiveContext.sql("use oraclehadoop")
>>>>>
>>>>>
>>>>>   var _ORACLEserver : String = "jdbc:oracle:thin:@rhes564:1521:mydb12"
>>>>>
>>>>>   var _username : String = "scratchpad"
>>>>>
>>>>>   var _password : String = "xxxxxx"
>>>>>
>>>>>
>>>>>
>>>>>   // Get data from Oracle table scratchpad.dummy
>>>>>
>>>>>
>>>>>   val d = HiveContext.load("jdbc",
>>>>>
>>>>>   Map("url" -> _ORACLEserver,
>>>>>
>>>>>   "dbtable" -> "(SELECT to_char(ID) AS ID, to_char(CLUSTERED) AS
>>>>> CLUSTERED, to_char(SCATTERED) AS SCATTERED, to_char(RANDOMISED) AS
>>>>> RANDOMISED, RANDOM_STRING, SMALL_VC, PADDING FROM scratchpad.dummy)",
>>>>>
>>>>>   "user" -> _username,
>>>>>
>>>>>   "password" -> _password))
>>>>>
>>>>>
>>>>>   d.registerTempTable("tmp")
>>>>>
>>>>>   //
>>>>>
>>>>>   // Need to create and populate target ORC table oraclehadoop.dummy
>>>>>
>>>>>   //
>>>>>
>>>>>   HiveContext.sql("use oraclehadoop")
>>>>>
>>>>>   //
>>>>>
>>>>>   // Drop and create table dummy
>>>>>
>>>>>   //
>>>>>
>>>>>   HiveContext.sql("DROP TABLE IF EXISTS oraclehadoop.dummy")
>>>>>
>>>>>   var sqltext : String = ""
>>>>>
>>>>>   sqltext = """
>>>>>
>>>>>   CREATE TABLE oraclehadoop.dummy (
>>>>>
>>>>>      ID INT
>>>>>
>>>>>    , CLUSTERED INT
>>>>>
>>>>>    , SCATTERED INT
>>>>>
>>>>>    , RANDOMISED INT
>>>>>
>>>>>    , RANDOM_STRING VARCHAR(50)
>>>>>
>>>>>    , SMALL_VC VARCHAR(10)
>>>>>
>>>>>    , PADDING  VARCHAR(10)
>>>>>
>>>>>   )
>>>>>
>>>>>   CLUSTERED BY (ID) INTO 256 BUCKETS
>>>>>
>>>>>   STORED AS ORC
>>>>>
>>>>>   TBLPROPERTIES (
>>>>>
>>>>>   "orc.create.index"="true",
>>>>>
>>>>>   "orc.bloom.filter.columns"="ID",
>>>>>
>>>>>   "orc.bloom.filter.fpp"="0.05",
>>>>>
>>>>>   "orc.compress"="SNAPPY",
>>>>>
>>>>>   "orc.stripe.size"="16777216",
>>>>>
>>>>>   "orc.row.index.stride"="10000" )
>>>>>
>>>>>   """
>>>>>
>>>>>    HiveContext.sql(sqltext)
>>>>>
>>>>>   //
>>>>>
>>>>>   // Put data in Hive table. Clean up is already done
>>>>>
>>>>>   //
>>>>>
>>>>>   sqltext = """
>>>>>
>>>>>   INSERT INTO TABLE oraclehadoop.dummy
>>>>>
>>>>>   SELECT
>>>>>
>>>>>           ID
>>>>>
>>>>>         , CLUSTERED
>>>>>
>>>>>         , SCATTERED
>>>>>
>>>>>         , RANDOMISED
>>>>>
>>>>>         , RANDOM_STRING
>>>>>
>>>>>         , SMALL_VC
>>>>>
>>>>>         , PADDING
>>>>>
>>>>>   FROM tmp
>>>>>
>>>>>   """
>>>>>
>>>>>    HiveContext.sql(sqltext)
>>>>>
>>>>>   println ("\nFinished at"); sqlContext.sql("SELECT
>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>>> ").collect.foreach(println)
>>>>>
>>>>>   sys.exit()
>>>>>
>>>>>  }
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> If you look at Figure 3 you will the status of the job broken into *Active
>>>>> Jobs* and *Completed Jobs *respectively. The description is pretty
>>>>> smart. It tells you which line of code was executed. For example “collect
>>>>> at ETL_scratchpad_dummy.scala:24” refers to line 24 of the code which is
>>>>> below:
>>>>>
>>>>> println ("\nStarted at"); sqlContext.sql("SELECT
>>>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>>>> ").collect.foreach(println)
>>>>>
>>>>>
>>>>> This Job (Job Id 0) is already completed
>>>>>
>>>>> On the other hand Active Job Id 1 “sql at
>>>>> ETL_scratchpad_dummy.scala:87” is currently running at line 87 of the
>>>>> code which is
>>>>>
>>>>>   sqltext = """
>>>>>
>>>>>   INSERT INTO TABLE oraclehadoop.dummy
>>>>>
>>>>>   SELECT
>>>>>
>>>>>           ID
>>>>>
>>>>>         , CLUSTERED
>>>>>
>>>>>         , SCATTERED
>>>>>
>>>>>         , RANDOMISED
>>>>>
>>>>>         , RANDOM_STRING
>>>>>
>>>>>         , SMALL_VC
>>>>>
>>>>>         , PADDING
>>>>>
>>>>>   FROM tmp
>>>>>
>>>>>   """
>>>>>
>>>>>    HiveContext.sql(sqltext)
>>>>>
>>>>>
>>>>> We can look at this job further by looking at the active job session
>>>>> in GUI though stages
>>>>>
>>>>>
>>>>> *Figure 4: Drilling down to execution*
>>>>>
>>>>>
>>>>>  ...................
>>>>>
>>>>>
>>>>> HTH
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>> On 21 July 2016 at 12:27, Joaquin Alzola <Joaquin.Alzola@lebara.com>
>>>>> wrote:
>>>>>
>>>>>> You have the same as link 1 but in English?
>>>>>>
>>>>>>    - spark-questions-concepts
>>>>>>    <http://litaotao.github.io/spark-questions-concepts?s=gmail>
>>>>>>    - deep-into-spark-exection-model
>>>>>>    <http://litaotao.github.io/deep-into-spark-exection-model?s=gmail>
>>>>>>
>>>>>> Seems really interesting post but in Chinese. I suppose google
>>>>>> translate suck on the translation.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *From:* Taotao.Li [mailto:charles.upboy@gmail.com]
>>>>>> *Sent:* 21 July 2016 04:04
>>>>>> *To:* Jean Georges Perrin <jgp@jgp.net>
>>>>>> *Cc:* Sachin Mittal <sjmittal@gmail.com>; user <user@spark.apache.org
>>>>>> >
>>>>>> *Subject:* Re: Understanding spark concepts cluster, master, slave,
>>>>>> job, stage, worker, executor, task
>>>>>>
>>>>>>
>>>>>>
>>>>>> Hi, Sachin,  here are two posts about the basic concepts about spark:
>>>>>>
>>>>>>
>>>>>>
>>>>>>    - spark-questions-concepts
>>>>>>    <http://litaotao.github.io/spark-questions-concepts?s=gmail>
>>>>>>    - deep-into-spark-exection-model
>>>>>>    <http://litaotao.github.io/deep-into-spark-exection-model?s=gmail>
>>>>>>
>>>>>>
>>>>>>
>>>>>> And, I fully recommend databrick's post:
>>>>>> https://databricks.com/blog/2016/06/22/apache-spark-key-terms-explained.html
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Jul 21, 2016 at 1:36 AM, Jean Georges Perrin <jgp@jgp.net>
>>>>>> wrote:
>>>>>>
>>>>>> Hey,
>>>>>>
>>>>>>
>>>>>>
>>>>>> I love when questions are numbered, it's easier :)
>>>>>>
>>>>>>
>>>>>>
>>>>>> 1) Yes (but I am not an expert)
>>>>>>
>>>>>> 2) You don't control... One of my process is going to 8k tasks, so...
>>>>>>
>>>>>> 3) Yes, if you have HT, it double. My servers have 12 cores, but HT,
>>>>>> so it makes 24.
>>>>>>
>>>>>> 4) From my understanding: Slave is the logical computational unit and
>>>>>> Worker is really the one doing the job.
>>>>>>
>>>>>> 5) Dunnoh
>>>>>>
>>>>>> 6) Dunnoh
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Jul 20, 2016, at 1:30 PM, Sachin Mittal <sjmittal@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I was able to build and run my spark application via spark submit.
>>>>>>
>>>>>> I have understood some of the concepts by going through the resources
>>>>>> at https://spark.apache.org but few doubts still remain. I have few
>>>>>> specific questions and would be glad if someone could share some light on
>>>>>> it.
>>>>>>
>>>>>> So I submitted the application using spark.master    local[*] and I
>>>>>> have a 8 core PC.
>>>>>>
>>>>>>
>>>>>> - What I understand is that application is called as job. Since mine
>>>>>> had two stages it gets divided into 2 stages and each stage had number of
>>>>>> tasks which ran in parallel.
>>>>>>
>>>>>> Is this understanding correct.
>>>>>>
>>>>>>
>>>>>>
>>>>>> - What I notice is that each stage is further divided into 262 tasks
>>>>>> From where did this number 262 came from. Is this configurable. Would
>>>>>> increasing this number improve performance.
>>>>>>
>>>>>> - Also I see that the tasks are run in parallel in set of 8. Is this
>>>>>> because I have a 8 core PC.
>>>>>>
>>>>>> - What is the difference or relation between slave and worker. When I
>>>>>> did spark-submit did it start 8 slaves or worker threads?
>>>>>>
>>>>>> - I see all worker threads running in one single JVM. Is this because
>>>>>> I did not start  slaves separately and connect it to a single master
>>>>>> cluster manager. If I had done that then each worker would have run in its
>>>>>> own JVM.
>>>>>>
>>>>>> - What is the relationship between worker and executor. Can a worker
>>>>>> have more than one executors? If yes then how do we configure that. Does
>>>>>> all executor run in the worker JVM and are independent threads.
>>>>>>
>>>>>> I suppose that is all for now. Would appreciate any response.Will add
>>>>>> followup questions if any.
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> Sachin
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> *___________________*
>>>>>>
>>>>>> Quant | Engineer | Boy
>>>>>>
>>>>>> *___________________*
>>>>>>
>>>>>> *blog*:    http://litaotao.github.io
>>>>>> <http://litaotao.github.io/?utm_source=spark_mail>
>>>>>>
>>>>>> *github*: www.github.com/litaotao
>>>>>> This email is confidential and may be subject to privilege. If you
>>>>>> are not the intended recipient, please do not copy or disclose its content
>>>>>> but contact the sender immediately upon receipt.
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>
>

Mime
View raw message