Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C6702200B4B for ; Thu, 21 Jul 2016 13:56:26 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id C4CC9160A6D; Thu, 21 Jul 2016 11:56:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A4F1E160A68 for ; Thu, 21 Jul 2016 13:56:23 +0200 (CEST) Received: (qmail 71730 invoked by uid 500); 21 Jul 2016 11:56:20 -0000 Mailing-List: contact user-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@spark.apache.org Received: (qmail 71715 invoked by uid 99); 21 Jul 2016 11:56:20 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Jul 2016 11:56:20 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 04C90C3FEB for ; Thu, 21 Jul 2016 11:56:20 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.38 X-Spam-Level: *** X-Spam-Status: No, score=3.38 tagged_above=-999 required=6.31 tests=[AC_DIV_BONANZA=0.001, DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_REPLY=1, HTML_MESSAGE=2, KAM_LINEPADDING=1.2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 8RY7Des3r2Yq for ; Thu, 21 Jul 2016 11:56:09 +0000 (UTC) Received: from mail-qt0-f176.google.com (mail-qt0-f176.google.com [209.85.216.176]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id AC5765F246 for ; Thu, 21 Jul 2016 11:56:08 +0000 (UTC) Received: by mail-qt0-f176.google.com with SMTP id u25so42226320qtb.1 for ; Thu, 21 Jul 2016 04:56:08 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=7XJfM4UNG0lnJdxJ2ljTqzLCdnvMbgKjpfYZUwzFmEY=; b=gMFYwEKOVRmMqo074bHKVU05sQvIvXUzos95bxdwZew1irYnsf/IAho5MVWUdvvNlR V2GdgrwFHh/rKqb05RUsGct5FWxuKirjcQUQJPFcesnx5RED5zhqx/JQuH19FsUcax0d +monjsSeh6Hjs21rmajhspxw/gLvjuD4lQfVEQHHKAOjy06dxYoNiAUJBJ2UVN5jEAxR IhWWgxN5Nm+JrZ3BJWZX5OTVdQ5sjur8YpDuj+5AQJEtGcYgH32atLCT08AP3/n8tyHS WM5N2h00a5ORB1DgUS/hLfsx1dhcr5RC0ovp/HvkkK4AokHxLUF3EwAIX8zGtqhV1+OP ABqQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=7XJfM4UNG0lnJdxJ2ljTqzLCdnvMbgKjpfYZUwzFmEY=; b=cOuzIxJkNHIL4rHev+pnlyKUN0jYSrKX/cvNtssYdvFywS1iKf/2yB57zWvGd0uk3u wCwc18kgQCV3CIJ+6SkyArk/aht/7isYW90pVoHaMAjigpQ+BeMQ7v8srDj+on6vpKrL E2RO0/pySV6YBz4I0wng9ytTEDhOde6TH368XBWDHtQK7yUEHhgdvw1VCBlGAaKEZ9gB 7j6yfDbByA7E07B/zNHtLI/Vnqwl/qs4sgBK55L5EeAT1qG+oKugbe3I95NE0NzDHOoN Rz5jBkypcU7f58aJUHkrJKvcIW63ypCGlMEINHL5SdkeJ2Wv128MtzuCni3yXXjF1kYy 4Ihg== X-Gm-Message-State: ALyK8tJ191fHS8tREeubfN8K5fXfVx29fbfywdSJHjlH+h7yhHFxEfm4A/9emXIX0cS1R/24cDRBuxrZig+OaA== X-Received: by 10.200.48.42 with SMTP id f39mr64740225qte.81.1469102166975; Thu, 21 Jul 2016 04:56:06 -0700 (PDT) MIME-Version: 1.0 Received: by 10.55.143.129 with HTTP; Thu, 21 Jul 2016 04:56:06 -0700 (PDT) In-Reply-To: References: From: Mich Talebzadeh Date: Thu, 21 Jul 2016 12:56:06 +0100 Message-ID: Subject: Re: Understanding spark concepts cluster, master, slave, job, stage, worker, executor, task To: Joaquin Alzola Cc: "Taotao.Li" , Jean Georges Perrin , Sachin Mittal , user Content-Type: multipart/alternative; boundary=001a1137b98ecd28ab05382400bd archived-at: Thu, 21 Jul 2016 11:56:27 -0000 --001a1137b98ecd28ab05382400bd Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable I started putting together some Performance and Tuning guide for Spark starting from the simplest operation Local and Standalone modes but sounds like I never have the time to finish it! This is some stuff but is in word and wrapped together in some arbitrary way. Anyway if you think it is useful let me know and I try to finish it := ) Some of the points we have already discussed in this user group or part of wider available literature. It is aimed at practitioner. *Introduction* According to Spark Website, Apache Spark is a fast and general purpose engine for large-scale data processing. It is written mostly in Scala, and provides APIs for Scala, Java and Python. It is fully compatible with Hadoop Distributed File System (HDFS), however it extends on Hadoop=E2=80=99s core functionality by providing in-memory clust= er computation among other things Providing in-memory capabilities is probably one of the most import aspects of Spark that allows one to do computation in-memory. It also supports an advanced scheduler based on directed acyclic graph (DAG ). These capabilities allow Spark to be used as an advanced query engine with the help of Spark shell and Spark SQL. For near real time data processing Spark Streaming can be used. Another important but often understated capability of Spark is deploying it to be used as an advanced execution engine for other Hadoop tools such as Hive. Like most of the tools in Hadoop ecosystem, Spark will require careful tuning to get the most out of it. Thus, in these brief notes we will aim to address these points to ensure that you create an infrastructure for Spark, which is not only performant but also scalable for your needs. *Why Spark* The Hadoop ecosystem is nowadays crowded with a variety of offerings. Some of them are complementary and others are competing with each other. Spark is unique in that in a space of relatively short time it has grown much in its popularity and as of today is one of the most popular tools in the Hadoop ecosystem. The fundamental technology of Hadoop using Map-Reduce algorithm as its core execution engine gave rise to deployment of other methods. Although Map-Reduce was and still is an incredible technology, it lacked the speed and performance required for certain business needs like dealing with real-time analytics. Spark was developed ground up to address these concerns. *Overview of Spark Architecture* Spark much like many other tools runs a set of instructions summarized in the form of an application. An application consists of a Driver Program that is responsible for submitting, running and monitoring the code. Spark can distribute the work load across what is known as cluster. In other words, Spark applications run as independent sets of processes on a cluster. Process is an application running on UNIX/Linux system A *cluster *is a collection of servers, called nodes that communicate with each other to make a set of services highly available to the applications running on them. Before going any further one can equate a node with a physical host, a VM host or any other resource capable of providing RAM and core. Some refer to nodes as machines as well. *Spark Operations* Spark takes advantages of a cluster by dividing the workload across this cluster and executing operations in parallel to speed up the processing. To affect this Spark provides *as part of its core architecture *an abstraction layer called a *Resilient Distributed Dataset (RDD). *Simply put an RDD is a selection of elements (like a sequence, a text file, a CSV file, data coming in from streaming sources like Twitter, Kafka and so forth) that one wants to work with. What an RDD does is to partition that data across the nodes of the Spark cluster to take advantage of parallel processing. *RDDs in Spark are immutable *meaning that *they cannot be changed, but can be acted upon* to create other RDDs and result sets. RDDs can also be cached in memory (to be precise in the memory allocated to Spark) across multiple nodes for faster parallel operations. So that takes care of data. There is a second abstraction in Spark known as *shared variables* that can be used in parallel operations. By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. Sometimes, a variable need to be shared across tasks, or between tasks and the driver program. *Spark supports two types of shared variables*: *broadcast variables*, which can be used to cache a value in memory on all nodes, and *accumulators*, which are variables that are only =E2=80=9Cadded=E2=80=9D t= o, such as counters and sums. We will cover them later We have been mentioning Spark clusters but clusters can be configured differently as well through what is known as the configuration of cluster manager. To this effect, Spark currently supports the following configurations: - *Spark Local* - Spark runs on the local host. This is the simplest set up and best suited for learners who want to understand different concept= s of Spark and those performing unit testing. - *Spark Standalone *=E2=80=93 a simple cluster manager included with Spar= k that makes it easy to set up a cluster. - *YARN Cluster Mode,* the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go a= way after initiating the application. This is invoked with =E2=80=93master y= arn and --deploy-mode cluster - *YARN Client Mode*, the driver runs in the client process, and the application master is only used for requesting resources from YARN. Unlike Spark standalone mode, in which the master=E2=80=99s address is specified in t= he --master parameter, in YARN mode the ResourceManager=E2=80=99s address i= s picked up from the Hadoop configuration. Thus, the --master parameter is yarn. = This is invoked with --deploy-mode client *- client mode requires the process that launched the app remain alive. Meaning the host where it lives has to stay alive, and it may not be super-friendly to ssh sessions dying, for example, unless you use nohup. - client mode driver logs are printed to stderr by default. yes you can change that, but in cluster mode, they're all collected by yarn without any user intervention. - if your edge node (from where the app is launched) isn't really part of the cluster (e.g., lives in an outside network with firewalls or higher latency), you may run into issues. - in cluster mode, your driver's cpu / memory usage is accounted for in YARN; this matters if your edge node is part of the cluster (and could be running yarn containers), since in client mode your driver will potentially use a lot of memory / cpu. - finally, in cluster mode YARN can restart your application without user interference. this is useful for things that need to stay up (think a long running streaming job, for example).* *If your client is not close to the cluster (e.g. your PC) then you definitely want to go cluster to improve performance. If your client is close to the cluster (e.g. an edge node) then you could go either client or cluster. Note that by going client, more resources are going to be used on the edge node.* In this part one, we will confine ourselves with *Spark on Local host *and will leave the other two to other parts. *Spark Local Mode* Spark Local Mode is the simplest configuration of Spark that does not require a Cluster. The user on the local host can launch and experiment with Spark. In this mode the driver program (SparkSubmit), the resource manager and executor all exist within the same JVM. The JVM itself is the worker thread When you use spark-shell or for that matter spark-sql, you are staring spark-submit under the bonnet. These two shells are created to make life easier to work on Spark. However, if you look at what $SPARK_HOME/bin/spark-shell does in the script, you will notice my point: "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@" So that is basically spark-submit JVM invoked with the name "Spark shell" Since it is using spark-submit it takes all the parameters related to spark-submit. However, remember that these two shells are created for read=E2=80=93eval=E2=80=93print loop (REPL) and they default to Local mode.= You cannot use them for example in YARN cluster mode. Some default parameters can be changed. For example, the default Web GUI for Spark is 4040. However, I start it with 55555 and modified it to call it a different name "${SPARK_HOME}"/bin/spark-submit --conf "spark.ui.port=3D55555" --class org.apache.spark.repl.Main --name "my own Spark shell" "$@" Before going further let us understand the concept of cores and threads. These days we talk about cores more than CPUs. Each CPU comes with a number of cores. Simply put to work out the number of threads you can do this: cat /proc/cpuinfo|grep processor|wc -l Which for me it returns 12 and that is all I need to know without worrying what physical cores, logical cores and CPU really mean as these definitions may vary from one hardware vendor to another. On local mode with you have --master local This will start with one (worker) *thread *or equivalent to =E2=80=93master local[1]. You can start by more than one thread by specifying the number of threads *k* in =E2=80=93master local[k]. You can also start using all avail= able threads with =E2=80=93master local[*]. The degree of parallelism is defined= by the number of threads *k*. In *Local mode*, you do not need to start master and slaves/workers. In this mode it is pretty simple and you can run as many JVMs (spark-submit) as your resources allow (resource meaning memory and cores). Additionally, the GUI starts by default on port 4040, next one on 4041 and so forth unless you specifically start it with --conf "spark.ui.port=3Dnnnnn" Remember this is all about testing your apps. It is NOT a performance test. What it allows you is to test multiple apps concurrently and more importantly gets you started and understand various configuration parameters that Spark uses together with spark-submit executable You can of course use spark-shell and spark-sql utilities. These in turn rely on spark-submit executable to run certain variations of the JVM. In other words, you are still executing spark-submit. You can pass parameters to spark-submit with an example shown below: ${SPARK_HOME}/bin/spark-submit \ --packages com.databricks:spark-csv_2.11:1.3.0 \ --driver-memory 2G \ --num-executors 1 \ --executor-memory 2G \ --master local \ --executor-cores 2 \ --conf "spark.scheduler.mode=3DFAIR" \ --conf "spark.executor.extraJavaOptions=3D-XX:+PrintGCDetai= ls -XX:+PrintGCTimeStamps" \ --jars /home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar \ --class "${FILE_NAME}" \ --conf "spark.ui.port=3D4040=E2=80=9D \ --conf "spark.driver.port=3D54631" \ --conf "spark.fileserver.port=3D54731" \ --conf "spark.blockManager.port=3D54832" \ --conf "spark.kryoserializer.buffer.max=3D512" \ ${JAR_FILE} \ >> ${LOG_FILE} *Note that in the above example I am only using modest resources. This is intentional to ensure that resources are available for the other Spark jobs that I may be testing on this standalone node.* *Alternatively, you can specify some of these parameters when you are creating a new SparkConf* *val sparkConf =3D new SparkConf().* * setAppName("CEP_streaming").* * setMaster("local").* * Set(=E2=80=9Cnum.executors=E2=80=9D, =E2=80=9C1=E2=80=9D).* * set("spark.executor.memory", "2G").* * set(=E2=80=9Cspark.executor.cores=E2=80=9D, =E2=80=9C2=E2=80= =9D).* * set("spark.cores.max", "2").* * set("spark.driver.allowMultipleContexts", "true").* * set("spark.hadoop.validateOutputSpecs", "false")* *You can practically run most of your unit testing with Local mode and deploy variety of options including running SQL queries, reading data from CSV files, writing to HDFS, creating Hive tables including ORC tables and doing Spark Streaming.* *The components of a Spark App* *Although this may be of little relevance to Local mode, it would be beneficial to clarify a number of Spark terminologies here.* A Spark application consists of a driver program and a list of executors. The driver program is the main program, which coordinates the executors to run the Spark application. Executors are worker nodes' processes in charge of running individual tasks in a given Spark job. The executors run the tasks assigned by the driver program. In Local mode, the driver program runs inside the JVM and the driver program is running on the local machine. There is only one executor and it is called *driver* and the tasks are executed by the threads locally as well. This single executor will be started with *k* threads. Local mode is different than Standalone mode that uses Spark in-built cluster set-up*.* *Driver Program: *The driver is the process started by spark-submit. The application relies on the initial Spark specific environment setting in the shell that the application is started to create what is known as *SparkCont= ext object*. SparkContext tells the driver program how to access the Spark cluster among other things. It is a separate Java process. It is identified as *SparkSubmit* in jps *Standalone Master* is not required in Local mode *Standalone Worker* is not required in Local *Executor *is the program that is launched on the Worker when a Job starts executing *Tasks *Each Spark application is broken down into stages and each stage is completed by one or more tasks. A task is a thread of execution that an executor runs on a single node. *Cache* is the memory allocated to this Spark process Going back to Figure 1, we notice Cache, Executor and Tasks. These are as follows: Figure 2 below shows a typical Spark master URL. Note the number of cores and Memory allocation for each worker. These are default maximum on his host. Again these are the resource ceilings it does not mean that they go and grab those values. *Figure 2: A typical Spark master URL* Note that as stated each worker grabs all the available cores and allocates the remaining memory on each host. However, these values are somehow misleading and are not updated. So I would not worry too much about what it says in this page. *Configuring Spark parameters* To configure Spark shell parameters, you will need to modify the settings in $SPARK_HOME/conf/spark-env.sh script Note that the shells in $SPARK_HOME/sbin call $SPARK_HOME/conf/spark-env.sh scripts. So if you modify this file, remember to restart your master and slaves=E2=80=99 routines Every Spark executor in an application that has the same fixed number of cores and same fixed heap size. The number of cores can be specified with the --executor-cores flag when invoking spark-submit, spark-shell, and pyspark from the command line, or by setting the spark.executor.cores property in the spark-defaults.conf file or on a SparkConf object. Similarly, the heap size can be controlled with the --executor-memory flag or the spark.executor.memory property. The cores property controls the number of concurrent tasks an executor can run*. **--executor-cores 5** means that each executor can run a maximum of five tasks at the same time.* The memory property impacts the amount of data Spark can cache, as well as the maximum sizes of the shuffle data structures used for grouping, aggregations, and joins. *The **--num-executors** command-line flag or **spark.executor.instances** configuration property control the number of executors requeste*d. Starting in CDH 5.4/Spark 1.3, you will be able to avoid setting this property by turning on dynamic allocation with the spark.dynamicAllocation.enabled property. Dynamic allocation enables a Spark application to request executors when there is a backlog of pending tasks and free up executors when idle. *Resource scheduling* The standalone cluster mode currently only supports a simple First In First Out (FIFO) scheduler across applications. Thus to allow multiple concurrent users, you can control the maximum number of resources each application will use. By default, the default memory used in 512M. You can increase this value by setting the following parameter in $SPARK_HOME/conf/ spark-defaults.conf export spark.driver.memory 4g or by supplying configuration setting at runtime to spark-shell or spark-submit *Note that in this mode a process will acquire all cores in the cluster, which only makes sense if you just run one application at a time*. You can cap the number of cores by setting spark.cores.max in your SparkConf. For example: val conf =3D new SparkConf(). setAppName("MyApplication"). setMaster("local[2]"). set("spark.executor.memory", "4G"). set("spark.cores.max", "2"). set("spark.driver.allowMultipleContexts", "true") val sc =3D new SparkContext(conf) Note that setMaster("local[2]"). Specifies that it is run locally with two threads - local uses 1 thread. - local[N] uses N threads. - local[*] uses as many threads as there are cores. However, since driver-memory setting encapsulates the JVM, you will need to set the amount of driver memory for any non-default value *before starting JVM by providing the new value:* ${SPARK_HOME}/bin/spark-shell --driver-memory 4g Or ${SPARK_HOME}/bin/spark-submit --driver-memory 4g You can of course have a simple SparkConf values and set the additional Spark configuration parameters at submit time Example val sparkConf =3D new SparkConf(). setAppName("CEP_streaming"). * setMaster("local[2]").* set("spark.streaming.concurrentJobs", "2"). set("spark.driver.allowMultipleContexts", "true"). set("spark.hadoop.validateOutputSpecs", "false") And at submit time do ${SPARK_HOME}/bin/spark-submit \ --master local[2] \ --driver-memory 4G \ --num-executors 1 \ --executor-memory 4G \ --executor-cores 2 \ =E2=80=A6.. Note that this will override earlier Spark configuration parameters with sparkConf *Resource Monitoring* You can see the job progress in Spark Job GUI that by default runs on :4040. This GUI has different tabs for Jobs, Stages, Executors etc. An example is shown below: *Figure 3: A typical Spark Job URL* Figure 3 shows the status of Jobs. This is a simple job that uses JDBC to access Oracle database and a table called dummy with 1 billion rows. It then takes that table, caches it by registering it as temptable, create an ORC table in Hive and populates that table. It was compiled using Maven and executed through $SPARK_HOME/sbin/spark-submit.sh The code is shown below: for ETL_scratchpad_dummy.scala import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.types._ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.functions._ object ETL_scratchpad_dummy { def main(args: Array[String]) { val conf =3D new SparkConf(). setAppName("ETL_scratchpad_dummy"). setMaster("local[2]"). set("spark.executor.memory", "4G"). set("spark.cores.max", "2"). set("spark.driver.allowMultipleContexts", "true") val sc =3D new SparkContext(conf) // Create sqlContext based on HiveContext val sqlContext =3D new HiveContext(sc) import sqlContext.implicits._ val HiveContext =3D new org.apache.spark.sql.hive.HiveContext(sc) println ("\nStarted at"); sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println) HiveContext.sql("use oraclehadoop") var _ORACLEserver : String =3D "jdbc:oracle:thin:@rhes564:1521:mydb12" var _username : String =3D "scratchpad" var _password : String =3D "xxxxxx" // Get data from Oracle table scratchpad.dummy val d =3D HiveContext.load("jdbc", Map("url" -> _ORACLEserver, "dbtable" -> "(SELECT to_char(ID) AS ID, to_char(CLUSTERED) AS CLUSTERED, to_char(SCATTERED) AS SCATTERED, to_char(RANDOMISED) AS RANDOMISED, RANDOM_STRING, SMALL_VC, PADDING FROM scratchpad.dummy)", "user" -> _username, "password" -> _password)) d.registerTempTable("tmp") // // Need to create and populate target ORC table oraclehadoop.dummy // HiveContext.sql("use oraclehadoop") // // Drop and create table dummy // HiveContext.sql("DROP TABLE IF EXISTS oraclehadoop.dummy") var sqltext : String =3D "" sqltext =3D """ CREATE TABLE oraclehadoop.dummy ( ID INT , CLUSTERED INT , SCATTERED INT , RANDOMISED INT , RANDOM_STRING VARCHAR(50) , SMALL_VC VARCHAR(10) , PADDING VARCHAR(10) ) CLUSTERED BY (ID) INTO 256 BUCKETS STORED AS ORC TBLPROPERTIES ( "orc.create.index"=3D"true", "orc.bloom.filter.columns"=3D"ID", "orc.bloom.filter.fpp"=3D"0.05", "orc.compress"=3D"SNAPPY", "orc.stripe.size"=3D"16777216", "orc.row.index.stride"=3D"10000" ) """ HiveContext.sql(sqltext) // // Put data in Hive table. Clean up is already done // sqltext =3D """ INSERT INTO TABLE oraclehadoop.dummy SELECT ID , CLUSTERED , SCATTERED , RANDOMISED , RANDOM_STRING , SMALL_VC , PADDING FROM tmp """ HiveContext.sql(sqltext) println ("\nFinished at"); sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println) sys.exit() } } If you look at Figure 3 you will the status of the job broken into *Active Jobs* and *Completed Jobs *respectively. The description is pretty smart. It tells you which line of code was executed. For example =E2=80=9Ccollect = at ETL_scratchpad_dummy.scala:24=E2=80=9D refers to line 24 of the code which = is below: println ("\nStarted at"); sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println) This Job (Job Id 0) is already completed On the other hand Active Job Id 1 =E2=80=9Csql at ETL_scratchpad_dummy.scal= a:87=E2=80=9D is currently running at line 87 of the code which is sqltext =3D """ INSERT INTO TABLE oraclehadoop.dummy SELECT ID , CLUSTERED , SCATTERED , RANDOMISED , RANDOM_STRING , SMALL_VC , PADDING FROM tmp """ HiveContext.sql(sqltext) We can look at this job further by looking at the active job session in GUI though stages *Figure 4: Drilling down to execution* ................... HTH Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=3DAAEAAAAWh2gBxianrbJd6= zP6AcPCCdOABUrV8Pw * http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On 21 July 2016 at 12:27, Joaquin Alzola wrote: > You have the same as link 1 but in English? > > - spark-questions-concepts > > - deep-into-spark-exection-model > > > Seems really interesting post but in Chinese. I suppose google translate > suck on the translation. > > > > > > *From:* Taotao.Li [mailto:charles.upboy@gmail.com] > *Sent:* 21 July 2016 04:04 > *To:* Jean Georges Perrin > *Cc:* Sachin Mittal ; user > *Subject:* Re: Understanding spark concepts cluster, master, slave, job, > stage, worker, executor, task > > > > Hi, Sachin, here are two posts about the basic concepts about spark: > > > > - spark-questions-concepts > > - deep-into-spark-exection-model > > > > > And, I fully recommend databrick's post: > https://databricks.com/blog/2016/06/22/apache-spark-key-terms-explained.h= tml > > > > > > On Thu, Jul 21, 2016 at 1:36 AM, Jean Georges Perrin wrote: > > Hey, > > > > I love when questions are numbered, it's easier :) > > > > 1) Yes (but I am not an expert) > > 2) You don't control... One of my process is going to 8k tasks, so... > > 3) Yes, if you have HT, it double. My servers have 12 cores, but HT, so i= t > makes 24. > > 4) From my understanding: Slave is the logical computational unit and > Worker is really the one doing the job. > > 5) Dunnoh > > 6) Dunnoh > > > > On Jul 20, 2016, at 1:30 PM, Sachin Mittal wrote: > > > > Hi, > > I was able to build and run my spark application via spark submit. > > I have understood some of the concepts by going through the resources at > https://spark.apache.org but few doubts still remain. I have few specific > questions and would be glad if someone could share some light on it. > > So I submitted the application using spark.master local[*] and I have = a > 8 core PC. > > > - What I understand is that application is called as job. Since mine had > two stages it gets divided into 2 stages and each stage had number of tas= ks > which ran in parallel. > > Is this understanding correct. > > > > - What I notice is that each stage is further divided into 262 tasks From > where did this number 262 came from. Is this configurable. Would increasi= ng > this number improve performance. > > - Also I see that the tasks are run in parallel in set of 8. Is this > because I have a 8 core PC. > > - What is the difference or relation between slave and worker. When I did > spark-submit did it start 8 slaves or worker threads? > > - I see all worker threads running in one single JVM. Is this because I > did not start slaves separately and connect it to a single master cluste= r > manager. If I had done that then each worker would have run in its own JV= M. > > - What is the relationship between worker and executor. Can a worker have > more than one executors? If yes then how do we configure that. Does all > executor run in the worker JVM and are independent threads. > > I suppose that is all for now. Would appreciate any response.Will add > followup questions if any. > > Thanks > > Sachin > > > > > > > > > > -- > > *___________________* > > Quant | Engineer | Boy > > *___________________* > > *blog*: http://litaotao.github.io > > > *github*: www.github.com/litaotao > This email is confidential and may be subject to privilege. If you are no= t > the intended recipient, please do not copy or disclose its content but > contact the sender immediately upon receipt. > --001a1137b98ecd28ab05382400bd Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
I started putting together some Performance=C2=A0and = Tuning=C2=A0guide for Spark starting from the simplest operation Local and = Standalone modes but sounds like I never have the time to finish it!
<= div>
This is some stuff but is in word and wrapped together i= n some arbitrary way.=C2=A0 Anyway if you think it is useful let me know an= d I try to finish it :)

Some of the points=C2=A0we= have already discussed in this user group or part of wider available liter= ature. It is aimed at practitioner.

Introduction

According to Spark Website, Apache Spark is a fast and general = purpose engine for large-scale data processing. It is written mostly in Scala, and provides APIs for Scala, Java and Python. It is fully compatible with Hadoo= p Distributed File System (HDFS), however it extends on Hadoop=E2=80=99s core functionality by providing in-memory cluster computation among other things=

Providing in-memory capabilities is probably one of the most import aspects of Spark that allow= s one to do computation in-memory. It also supports an advanced scheduler bas= ed on directed acyclic graph (DAG). These capabilities allow Spark to be used as an advanced query engine with the help of Spark shell and Spark SQL. For near = real time data processing Spark Streaming can be used. Another important but oft= en understated capability of Spark is deploying it to be used as an advanced execution engine for other Hadoop tools such as Hive.

Like most of the tools in Hadoop ecosystem, Spark will require careful tuning to get the most out = of it.

Thus, in these brief notes we will aim to address these points to ensure that you create an infrastructure for Spark, which is not only performant but also scalable fo= r your needs.

Why Spa= rk

The Hadoop ecosystem is nowadays crowded with a variety of offerings. Some of them are complementar= y and others are competing with each other. Spark is unique in that in a spac= e of relatively short time it has grown much in its popularity and as of today i= s one of the most popular tools in the Hadoop ecosystem.

The fundamental technology of Hadoop using Map-Reduce algorithm as its core execution engine gave rise to deployment of other methods. Although Map-Red= uce was and still is an incredible technology, it lacked the speed and performa= nce required for certain business needs like dealing with real-time analytics. Spark was developed ground up to address these concerns. =C2=A0

=C2= =A0

Overview of Spark Architecture

=C2=A0

Spar= k much like many other tools runs a set of instructions summarized in the for= m of an application. An application consists of a Driver Program that is respons= ible for submitting, running and monitoring the code.

=C2= =A0

=C2=A0Spark can distribute the work load across what is known as cluster. In other words, Spark applications run as independent sets of pr= ocesses on a cluster. Process is an application running on UNIX/Linux system

= =C2=A0

A cluster= is a collection of servers, called nodes that communicate with each other to mak= e a set of services highly available to the applications running on them.

=C2=A0

<= font color=3D"#000000" face=3D"Times New Roman" size=3D"3">

Before going any further = one can equate a node with a physical host, a VM host or any other resource capable= of providing RAM and core. Some refer to nodes as machines as well.

=C2=A0

<= font color=3D"#000000" face=3D"Times New Roman" size=3D"3">

Spark Operations

=C2=A0<= /span>

Spark takes advantages of= a cluster by dividing the workload across this cluster and executing operatio= ns in parallel to speed up the processing. To affect this Spark provides as= part of its core architecture an abstraction layer called a Resilient Distributed Dataset (RDD). <= /b>Simply put an RDD is a selection of elements (like a sequence, a text file, a CSV file, data coming in from streaming sources like Twitter, Kafka and so fort= h) that one wants to work with.

=C2=A0

What an RDD does is to pa= rtition that data across the nodes of the Spark cluster to take advantage of parall= el processing.

=C2=A0

RDDs in Spark are immu= table meaning that they cannot be changed, but can be acted upon to create other RDDs and result sets. RDDs can also be cache= d in memory (to be precise in the memory allocated to Spark) across multiple nod= es for faster parallel operations.

=C2=A0

<= font color=3D"#000000" face=3D"Times New Roman" size=3D"3">

So that takes care of dat= a. There is a second abstraction in Spar= k known as shared variables that can be used in parallel operations. By defau= lt, when Spark runs a function in parallel as a set of tasks on different nodes= , it ships a copy of each variable used in the function to each task. Sometimes,= a variable need to be shared across tasks, or between tasks and the driver program. Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only =E2=80=9Cadded=E2=80=9D to, such as counters and su= ms. We will cover them later

=C2=A0

<= font color=3D"#000000" face=3D"Times New Roman" size=3D"3">

We have been mentioning S= park clusters but clusters can be configured differently as well through what is known as= the configuration of cluster manager. To this effect, Spark currently supports = the following configurations:

  • <= b>Spark Local - Spark runs on the local host. This is the simplest set up and best suited for learners= who want to understand different concepts of Spark and those performing unit testing.

  • Spark = Standalone =E2=80=93 a simple cluster manager included with Spark that makes it easy to set up a cluster.

  • YARN Cluster Mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application. This is invoked with <= /span>=E2=80=93master yarn and --deploy-mode cluster

  • YARN Client Mode= , the driver runs in the client process, and the application master is only used for requesting resources from YARN. Unlike Spark standalone mode, in which th= e master=E2=80=99s address is specified in the --master parameter,= in YARN mode the ResourceManager=E2=80=99s address is picked up from the Hadoop configuratio= n. Thus, the --master parameter is yarn<= /span>. This is invoked with --deploy-mode client

- client mode requires the process that launched the app remain alive. Meaning the host where it lives= has to stay alive, and it may not be super-friendly to ssh sessions dying, for example, unless you use nohup.

- client mode driver logs are printed to stderr by default. yes you can cha= nge that, but in cluster mode, they're all collected by yarn without any us= er intervention.

- if your edge node (from where the app is launched) isn't really part = of the cluster (e.g., lives in an outside network with firewalls or higher latency= ), you may run into issues.

- in cluster mode, your driver's cpu / memory usage is accounted for in= YARN; this matters if your edge node is part of the cluster (and could be running yarn containers), since in client mode your driver will potentially use a l= ot of memory / cpu.

- finally, in cluster mode YARN can restart your application without user interference. this is useful for things that need to stay up (think a long running streaming job, for example).

If your client is not close to the cluster (e.g. your PC) then you definitely want to go cluster to improv= e performance. If your client is close to the cluster (e.g. an edge node) the= n you could go either client or cluster.=C2=A0 Note that by going client, mor= e resources are going to be used on the edge node.

In this part one, we will confine ourselves= with Spark on Local host and will leave the other two to other parts.

Spark Local Mode

Spark Local Mode is the simplest configurat= ion of Spark that does not require a Cluster. The user on the local host can launch and experiment with Spark.

=C2=A0

In this mode the driver program (SparkSubmi= t), the resource manager and executor all exist within the same JVM. The JVM itself is the worker thread

When you use spark-shell or for=C2=A0that matter spark-sql, you are staring spark-submit under the bonnet. These two shells are created to make life easier to work = on Spark.

=C2=A0

However, if you look at what=C2=A0$SPARK_HOME/bin/spark-shell does in the script,=C2=A0you will notice my point:



"${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"

=C2=A0

So that is basically spark-submit JVM invoked with the name "Spark shell"

=C2=A0

Since it is using spark-submit it takes all the parameters related to spark-submit. However, remember that these two shells are created for read=E2=80=93eval=E2=80=93pr= int loop (REPL) and they default to Local mode. You cannot use them for example in YARN cluster mode.

=C2=A0

Some default parameters can be changed. For example, the default Web GUI for Spark is 40= 40. However, I start it with 55555 and modified it to call it a different name<= /font>

=C2=A0

"${SPARK_HOME}"/bin/spark-submit --conf "spark.ui.port=3D55555" --class org.apache.spark.repl.Main --name "my own Spark shell" "$@"


Before going further let us understand the concept of cores and threads= . These days we talk about cores more than CPUs. Each CPU comes with a number= of cores.

Simply put to work out the number of thread= s you can do this:


cat /proc/cpuinfo|grep processor|wc -l


= =C2=A0Which for me it returns 12 and that is all I need to know without worrying what physical cores, logical co= res and CPU really mean as these definitions may vary from one hardware vendor = to another.

On local mode with you have

--master local

=C2=A0

This will start with one (worker) thread= or equivalent to =E2=80=93master local[1]. You can start by more than one thread by specifying the number of threads k in =E2=80=93master local[k]. You can also start using all available threads with =E2=80=93master local[*]. = The degree of parallelism is defined by the number of thre= ads k.

In Local mode, you do not need to st= art master and slaves/workers. In this mode it is pretty simple and you can run as many JVMs (spark-submit= ) as your resources allow (resource meaning memory and cores). Additionally, the= GUI starts by default on port 4040, next one on 4041 and so forth unless you specifically start it with --conf "spark.ui.port=3Dnnnnn"

Remember this is all about testing your app= s. It is NOT a performance test. What it allows you is to test multiple apps concurrently and more importantly gets you started and understand various configuration parameter= s that Spark uses together with spark-submit executable

You can of course use spark-shell and spark= -sql utilities. These in turn rely on spark-submit executable to run certain variations of the JV= M. In other words, you are still executing spark-submit. You can pass parameters = to spark-submit with an example shown below:

${SPARK_HOME}/bin/spark-submit \

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 --packages com.databricks:spark-csv_2.11:1.3.0 \

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 --driver-memory 2G = \

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 --num-executors 1 \=

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 --executor-memory 2= G \

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 --master local \

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 --executor-cores 2 = \

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 --conf "spark.scheduler.mode=3DFAIR" \

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 --conf "spark.executor.extraJavaOptions=3D-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 --jars /home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar \

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 --class "${FILE_NAME}" \

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0=C2=A0--conf= "spark.ui.port=3D4040=E2=80=9D \

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 --conf "spark.driver.port=3D54631" \

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 --conf "spark.fileserver.port=3D54731" \

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 --conf "spark.blockManager.port=3D54832" \

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 --conf "spark.kryoserializer.buffer.max=3D512" \

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 <= /span>=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0${JAR_FI= LE} \

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 >> ${LOG_FILE= }

<= font color=3D"#000000">Note that in the above example I am only using modes= t resources. This is intentional to ensure that resources are available for the other Spark jobs that I may be testing on this standalone node.

<= font color=3D"#000000">Alternatively, you can specify some of these paramet= ers when you are creating a new SparkConf

val sparkConf =3D new SparkCo= nf().

=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 setAppName("CEP_streaming").

=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 setMaster("= local").

=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 Set(=E2=80=9Cnum= .executors=E2=80=9D, =E2=80=9C1=E2=80=9D).

=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 =C2=A0set= ("spark.executor.memory", "2G").=

=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 set(= =E2=80=9Cspark.executor.cores=E2=80=9D, =E2=80=9C2=E2=80=9D).=

=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 set(= "spark.cores.max", "2").

=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 set("spark.driver.allowMultipleContexts", "true"= ).

=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 set("spark.hadoop.validateOutputSpecs", "false")=

<= font color=3D"#000000">=C2=A0

<= font color=3D"#000000">You can practically run most of your unit testing wi= th Local mode and deploy variety of options including running SQL queries, reading data from = CSV files, writing to HDFS, creating Hive tables including ORC tables and doing Spark Streaming.

<= font color=3D"#000000">The components of a Spark App=

<= font color=3D"#000000">Although this may be of little relevance to Local mo= de, it would be beneficial to clarify a number of Spark terminologies here.

A Spark application consists of a driver program and a list of executors. The drive= r program is the main program, which coordinates the executors to run the Spa= rk application. Executors are worker nodes' processes in charge of running individual tasks in a given Spark job. The executors run the tasks assigned= by the driver program.=C2=A0 In Local mode, the driver program runs inside the JVM and the driver program is running on the= local machine. There is only one executor and it is called driver and the tasks are executed by the threads locally as well. This single executor wil= l be started with k threads.

Local mode is different than Standalone mode that uses Spark= in-built cluster set-up.<= /p>

Driver Program: The driver is the process started by spark-submit. The application relies on the initial Spark specific environment setting in the shell that the application is started to create what is known as SparkCo= ntext object. SparkContext tells the driver program how to access the Spark cluster among other things. It i= s a separate Java process. It is identified as SparkSubmit in jps

Standalone Master is not required in Local mode

Standalone Worker is not required in Local

Executor is = the program that is launched on the Worker when a Job starts executing

Tasks Each S= park application is broken down into stages and each stage is completed by one o= r more tasks. A task is a thread of execution that an executor runs on a sing= le node.

Cache is the= memory allocated to this Spark process

=C2=A0

Going back to Figure 1, we notice Cache, Ex= ecutor and Tasks. These are as follows:

Figure 2 below shows a typical Spark master URL. Note the number of cores and Memory allocation for each worker. These = are default maximum on his host. Again these are the resource ceilings it does not mean that they go and grab those values.

=20 =20 =20 =20 =20 =20 =20 =20 =20 =20 =20 =20 =20 =20 =20 =20 =20 =20

Figure 2: A t= ypical Spark master URL

=C2=A0

Note that as stated each worker grabs all t= he available cores and allocates the remaining memory on each host. However, these values are somehow misleading and are not updated. So I would not worry too much about what it says in this page.

=C2=A0

Configuring Spark parameters

To configure Spark shell parameters, you wi= ll need to modify the settings in $SPARK_HOME/conf/spark-env.sh script

Note that the shells in $SPARK_HOME/sbin ca= ll $SPARK_HOME/conf/spark-env.sh scripts. So if you modify this file, remember= to restart your master and slaves=E2=80=99 routines

Every Spark executor in an application that has the same fixed number of cores and same fixed heap siz= e. The number of cores can be specified with the =C2=A0flag wh= en invoking spark-submit, spark-shell, and pyspark from the command line, or by setting the spa= rk.executor.cores property in t= he spark-defaults.conf<= span lang=3D"EN-US" style=3D"color:rgb(102,102,102);font-family:CalibreWeb-= Light;font-size:15pt"> file or on a SparkConf object. Similarly, the heap size can be controlled with the --executor-memory= =C2=A0flag or the spa= rk.executor.memory property.=C2= =A0The cores property controls the number of concurrent tasks an executor can run.=C2= =A0--executor-cores 5 means that each execut= or can run a maximum of five tasks at the same time. The memory property impacts th= e amount of data Spark can cache, as well as the maximum sizes of the shuffle data structures used for grouping, aggregations, and joins.

The --num-executors command-line flag or spark.executor.instances configuration property control the number of executors requeste<= span lang=3D"EN-US" style=3D"color:rgb(102,102,102);font-family:CalibreWeb-= Light;font-size:15pt">d. Starting in CDH 5.4/Spark 1.3, you will be able to avoid setting this property by turning on dynamic allocation with the spark.dynamic= Allocation.enabled property. Dy= namic allocation enables a Spark application to request executors when there is a backlog of pending tasks and free up executors when idle.

=C2=A0

Resource scheduling

The standalone cluster mode curr= ently only supports a simple First In First Out (FIFO) scheduler across applications. Thus to allow mult= iple concurrent users, you can control the maximum number of resources each application will use.

By default, the default memory u= sed in 512M. You can increase this value by setting the following parameter in $SPARK_HOME/conf/ spark-defaul= ts.conf

export spark.driver.memory=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 4g

or by supplying configuration se= tting at runtime to spark-shell or spark-submit

Note that in this mode a process will acquire all cores in the cluster, which only makes sense if you just run one application at a time= .

You can cap the = number of cores by setting spark.cores.max in your SparkConf. For example:

=C2=A0 val conf =3D new SparkConf().

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 setAppName("MyApplication").

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 setMaster("local[2]&quo= t;).

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 set("spark.executor.memory", "4G").

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 set("spark.cores.max&qu= ot;, "2").

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 set("spark.driver.allowMultipleContexts", "true"= )

=C2=A0 val sc =3D new SparkContext(con= f)

=C2=A0

Note that setMaster("local[2]"). Specifies that it is run locally with two threads

  • local uses 1 thread.

  • local[N] uses N threads.

  • local[*] uses as many threads as there are cores.

=C2=A0

However, since driver-memory setting encapsulates the JVM, you will need to set the amount of driver memory for = any non-default value before starting JVM by providing the new value:

${SPARK_HOME}/bin/spark-shell --driver-memory 4g

Or

${SPARK_HOME}/bin/spark-submit --driver-memory 4g

=C2=A0

You can of course have a simple SparkConf values and set the additional Spark configuration parameters at submit time

=C2=A0

Example

=C2=A0

val sparkConf =3D new SparkConf().

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 setAppName("CEP_streaming").

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 setMaster("local[2]").

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 set("spark.streaming.concurrentJobs", "2").

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 set("spark.driver.allowMultipleContexts", "true"= ).

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 set("spark.hadoop.validateOutputSpecs", "false")=

=C2=A0

And at submit time do

=C2=A0

${SPARK_HOME}/bin/spark-submit \

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 --master local[2] \=

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 --driver-memory 4G = \

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 --num-executors 1 \=

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 --executor-memory 4= G \

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 --executor-cores 2 = \

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 =E2=80=A6..

=

Note that this will override earlier Spark configuration parameters with sparkConf

=C2=A0

=C2=A0

Resource Monitorin= g

You can see the job p= rogress in Spark Job GUI that by default runs on <HOST>:4040. This GUI has different tabs for Jobs, Stages, Executors etc. An example is shown below:<= /font>

=C2=A0

=20

Figure 3: A t= ypical Spark Job URL

=C2=A0

Figure 3 shows the st= atus of Jobs. This is a simple job that uses JDBC to access Oracle database and a table called dummy with = 1 billion rows. It then takes that table, caches it by registering it as temptable, create an ORC table in Hive and populates that table. It was compiled using Maven and executed through $SPARK_HOME/sbin/spark-submit.sh<= /span>

=C2=A0

The code is shown bel= ow: for ETL_scratchpad_dummy.scala

import org.apac= he.spark.SparkContext

import org.apac= he.spark.SparkConf

import org.apac= he.spark.sql.Row

import org.apac= he.spark.sql.hive.HiveContext

import org.apac= he.spark.sql.types._

import org.apac= he.spark.sql.SQLContext

import org.apac= he.spark.sql.functions._

object ETL_scra= tchpad_dummy {

=C2=A0 def main(args: Array[String]) {

=C2=A0 val conf =3D new SparkConf().

=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0setAppName("ETL_scratchpad_dummy").<= /p>

=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 setMaster("local[2]").

=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 set("spark.executor.memory", "4G").

=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 set("spark.cores.max", "2").

=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 set("spark.driver.allowMultipleContexts", "true"= )

=C2=A0 val sc =3D new SparkContext(conf)

=C2=A0 // Create sqlContext based on HiveContext

=C2=A0 val sqlContext =3D new HiveContext(sc)

=C2=A0 import sqlContext.implicits._

=C2=A0 val HiveContext =3D new org.apache.spark.sql.hive.HiveContext(sc)

=C2=A0 println ("\nStarted at"); sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)

=C2=A0

=C2=A0 HiveContext.sql("use oraclehadoop")

=C2=A0

=C2=A0 var _ORACLEserver : String =3D "jdbc:oracle:thin:@rhes564:1521:mydb12"

=C2=A0 var _username : String =3D "scratchpad"

=C2=A0 var _password : String =3D "xxxxxx"

=C2=A0

=C2=A0

=C2=A0 // Get data from Oracle table scratchpad.dummy

=C2=A0

=C2=A0 val d =3D HiveContext.load("jdbc",

=C2=A0 Map("url" -> _ORACLEserver,

=C2=A0 "dbtable" -> "(SELECT to_char(ID) AS ID, to_char(CLUSTERED) AS CLUSTERED, to_char(SCATTERED) AS SCATTERED, to_char(RANDOMISED) AS RANDOMISED, RANDOM_STRING, SMALL_VC, PADDING FROM scratchpad.dummy)",

=C2=A0 "user" -> _username,

=C2=A0 "password" -> _password))

=C2=A0

=C2=A0 d.registerTempTable("tmp")

=C2=A0 //

=C2=A0 // Need to create and populate target ORC table oraclehadoop.dummy

=C2=A0 //

=C2=A0 HiveContext.sql("use oraclehadoop")

=C2=A0 //

=C2=A0 // Drop and create table dummy

=C2=A0 //

=C2=A0 HiveContext.sql("DROP TABLE IF EXISTS oraclehadoop.dummy")

=C2=A0 var sqltext : String =3D ""

=C2=A0 sqltext =3D """

=C2=A0 CREATE TABLE oraclehadoop.dummy (

=C2=A0=C2= =A0=C2=A0=C2=A0 ID INT

=C2=A0=C2= =A0 , CLUSTERED INT

=C2=A0=C2= =A0 , SCATTERED INT

=C2=A0=C2= =A0 , RANDOMISED INT

=C2=A0=C2= =A0 , RANDOM_STRING VARCHAR(50)

=C2=A0=C2= =A0 , SMALL_VC VARCHAR(10)

=C2=A0=C2= =A0 , PADDING=C2=A0 VARCHAR(10)

=C2=A0 )

=C2=A0 CLUSTERED BY (ID) INTO 256 BUCKETS

=C2=A0 STORED AS ORC

=C2=A0 TBLPROPERTIES (

=C2=A0 "orc.create.index"=3D"true",

=C2=A0 "orc.bloom.filter.columns"=3D"ID",

=C2=A0 "orc.bloom.filter.fpp"=3D"0.05",

=C2=A0 "orc.compress"=3D"SNAPPY",

=C2=A0 "orc.stripe.size"=3D"16777216",

=C2=A0 "orc.row.index.stride"=3D"10000" )

=C2=A0 """

=C2=A0=C2= =A0 HiveContext.sql(sqltext)

=C2=A0 //

=C2=A0 // Put data in Hive table. Clean up is already done

=C2=A0 //

=C2=A0 sqltext =3D """

=C2=A0 INSERT INTO TABLE oraclehadoop.dummy

=C2=A0 SELECT

=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 ID

=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 , CLUSTERED

=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 , SCATTERED

=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 , RANDOMISED

=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 , RANDOM_STRING

=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 , SMALL_VC

=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 , PADDING

=C2=A0 FROM tmp

=C2=A0 """

=C2=A0=C2= =A0 HiveContext.sql(sqltext)

=C2=A0 println ("\nFinished at"); sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)

=C2=A0 sys.exit()

=C2=A0}

}

=C2=A0

If you= look at Figure 3 you will the status of the job broken into Active Jobs and Completed Jobs respectively. The description is pretty smart. It tells you which line of code was execut= ed. For example =E2=80=9Ccollect at ETL_scratchpad_dummy.scala:24=E2=80=9D refers to li= ne 24 of the code which is below:

println ("\nStarted at"); sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') ").collect.foreach(println)

= =C2=A0

This J= ob (Job Id 0) is already completed

= On the other hand Active Job Id 1 =E2=80=9C<= span lang=3D"EN-US" style=3D"background:rgb(249,249,249);font-family:"= Helvetica",sans-serif;font-size:10.5pt">sql at ETL_scratchpad_dummy.sc= ala:87=E2=80=9D is cu= rrently running at line 87 of the code which is

=C2=A0 sqltext =3D """

=C2=A0 INSERT INTO TABLE oraclehadoop.dummy

=C2=A0 SELECT

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 ID

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 , CLUSTERED=

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 , SCATTERED=

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 , RANDOMISED

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 , RANDOM_STRING

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 , SMALL_VC<= /p>

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 , PADDING

=C2=A0 FROM tmp

=C2=A0 """

=C2=A0=C2=A0 HiveContext.sql(sqltext)

= =C2=A0

We can= look at this job further by looking at the active job session in GUI though stages

= =C2=A0

=20

Figure 4: Dri= lling down to execution

=

=C2=A0...................

<= span lang=3D"EN-US" style=3D"color:rgb(29,31,34);font-family:"Helvetic= a",sans-serif;font-size:10.5pt">

HTH


Dr Mich Talebzadeh

=C2=A0

LinkedIn =C2=A0https://www.linkedin.com/profile/view?id=3DAAEA= AAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

=C2=A0

http:= //talebzadehmich.wordpress.com


Disclaimer:=C2=A0Use = it=C2=A0at your own risk. Any and all responsibilit= y for any loss, damage or destruction of data or any other property which may arise from relying on this email= 9;s=C2=A0technical=C2=A0content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from = such loss, damage or destruction.

=C2=A0

<= font color=3D"#000000" face=3D"Times New Roman" size=3D"3">

On 21 July 2016 at 12:27, Joaquin Alzola <Joaquin.Alzola@lebara.com> wrote:

You have the same as link 1 but = in English?

Seems really interesting post but in Chinese. I supp= ose google translate suck on the translation.

=C2=A0

=C2=A0

From: Taot= ao.Li [mailto:= charles.upboy@gmail.com]
Sent: 21 July 2016 04:04
To: Jean Georges Perrin <jgp@jgp.net>
Cc: Sachin Mittal <sjmittal@gmail.com>; user <user@spark.apache.org>
Subject: Re: Understanding spark concepts cluster, master, slave, jo= b, stage, worker, executor, task

=C2=A0

Hi, Sachin, =C2=A0here are two posts about the basic concepts about spark= :

=C2=A0

=C2=A0

And, I fully recommend databrick's post:=C2=A0https://databricks.com/blog/2016/06/22/apache= -spark-key-terms-explained.html

=C2=A0

=C2=A0

On Thu, Jul 21, 2016 at 1:36 AM, Jean Georges Perrin= <jgp@jgp.net> w= rote:

Hey,

=C2=A0

I love when questions are numbered, it's easier = :)

=C2=A0

1) Yes (but I am not an expert)

2) You don't control... One of my process is goi= ng to 8k tasks, so...

3) Yes, if you have HT, it double. My servers have 1= 2 cores, but HT, so it makes 24.

4) From my understanding: Slave is the logical compu= tational unit and Worker is really the one doing the job.=C2=A0

5) Dunnoh

6) Dunnoh

=C2=A0

On Jul 20, 2016, at 1:30 PM, Sachin Mittal <sjmittal@gmail.com&g= t; wrote:

=C2=A0

Hi,

I was able to build and= run my spark application via spark submit.

I have understood some = of the concepts by going through the resources at https://spark.apach= e.org but few doubts still remain. I have few specific questions and wo= uld be glad if someone could share some light on it.

So I submitted the application using spark.master=C2= =A0=C2=A0=C2=A0 local[*] and I have a 8 core PC.


- What I understand is that application is called as job. Since mine had tw= o stages it gets divided into 2 stages and each stage had number of tasks w= hich ran in parallel.

Is this understanding correct.

=C2=A0

- What I notice is that= each stage is further divided into 262 tasks From where did this number 26= 2 came from. Is this configurable. Would increasing this number improve per= formance.

- Also I see that the t= asks are run in parallel in set of 8. Is this because I have a 8 core PC.

- What is the differenc= e or relation between slave and worker. When I did spark-submit did it star= t 8 slaves or worker threads?

- I see all worker thre= ads running in one single JVM. Is this because I did not start=C2=A0 slaves= separately and connect it to a single master cluster manager. If I had don= e that then each worker would have run in its own JVM.

- What is the relations= hip between worker and executor. Can a worker have more than one executors?= If yes then how do we configure that. Does all executor run in the worker = JVM and are independent threads.

I suppose that is all f= or now. Would appreciate any response.Will add followup questions if any.

Thanks

Sachin

=C2=A0

=C2=A0



=C2=A0

--

___________________

Quant | Engineer | Boy<= u>

___________________

blo= g: =C2=A0 =C2=A0<= a href=3D"http://litaotao.github.io?utm_source=3Dspark_mail" target=3D"_bla= nk">http://litaotao.github.io

This email is confidential and may be subject to privilege. If you are not = the intended recipient, please do not copy or disclose its content but cont= act the sender immediately upon receipt.

--001a1137b98ecd28ab05382400bd--