spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Eyal Zituny (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-21443) Very long planning duration for queries with lots of operations
Date Sun, 23 Jul 2017 05:51:02 GMT

    [ https://issues.apache.org/jira/browse/SPARK-21443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16097521#comment-16097521
] 

Eyal Zituny commented on SPARK-21443:
-------------------------------------

Disabling this options indeed, fixed my problem. 
i would have suggest to add a log message in cases when such rules takes more then a second
(or less), saying that there is an option to disable this rule, since otherwise it isn't easy
to understand what is going on and how it can fix fixed.
btw. when i looked for a spark conf that might help me with this issue, i was going over all
the conf starting with "spark.sql.optimizer" and obviously haven't noticed this one

> Very long planning duration for queries with lots of operations
> ---------------------------------------------------------------
>
>                 Key: SPARK-21443
>                 URL: https://issues.apache.org/jira/browse/SPARK-21443
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL, Structured Streaming
>    Affects Versions: 2.2.0
>            Reporter: Eyal Zituny
>
> Creating a streaming query with large amount of operations and fields (100+) results
in a very long query planning phase. in the example bellow, the plan phase has taken 35 seconds
while the actual batch execution took only 1.3 second.
> after some investigation, i have found out that the root causes of this are 2 optimizer
rules which seems to take most of the planning time: InferFiltersFromConstraints and PruneFilters
> I would suggest the following:
> # fix the inefficient optimizer rules
> # add warn level logging if a rule has taken more then xx ms
> # allow custom removing of optimizer rules (opposite to spark.experimental.extraOptimizations)
> # reuse query plans (optional) where possible
> reproducing this issue can be done with the bellow script which simulates the scenario:
> {code:java}
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.execution.streaming.MemoryStream
> import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryProgressEvent, QueryStartedEvent,
QueryTerminatedEvent}
> import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQueryListener}
> case class Product(pid: Long, name: String, price: Long, ts: Long = System.currentTimeMillis())
> case class Events (eventId: Long, eventName: String, productId: Long) {
> 	def this(id: Long) = this(id, s"event$id", id%100)
> }
> object SparkTestFlow {
> 	def main(args: Array[String]): Unit = {
> 		val spark = SparkSession
> 		  .builder
> 		  .appName("TestFlow")
> 		  .master("local[8]")
> 		  .getOrCreate()
> 		spark.sqlContext.streams.addListener(new StreamingQueryListener {
> 			override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
> 			override def onQueryProgress(event: QueryProgressEvent): Unit = {
> 				if (event.progress.numInputRows>0) {
> 					println(event.progress.toString())
> 				}
> 			}
> 			override def onQueryStarted(event: QueryStartedEvent): Unit = {}
> 		})
> 		
> 		import spark.implicits._
> 		implicit val  sclContext = spark.sqlContext
> 		import org.apache.spark.sql.functions.expr
> 		val seq = (1L to 100L).map(i => Product(i, s"name$i", 10L*i))
> 		val lookupTable = spark.createDataFrame(seq)
> 		val inputData = MemoryStream[Events]
> 		inputData.addData((1L to 100L).map(i => new Events(i)))
> 		val events = inputData.toDF()
> 		  .withColumn("w1", expr("0"))
> 		  .withColumn("x1", expr("0"))
> 		  .withColumn("y1", expr("0"))
> 		  .withColumn("z1", expr("0"))
> 		val numberOfSelects = 40 // set to 100+ and the planning takes forever
> 		val dfWithSelectsExpr = (2 to numberOfSelects).foldLeft(events)((df,i) =>{
> 			val arr = df.columns.++(Array(s"w${i-1} + rand() as w$i", s"x${i-1} + rand() as x$i",
s"y${i-1} + 2 as y$i", s"z${i-1} +1 as z$i"))
> 			df.selectExpr(arr:_*)
> 		})
> 		val withJoinAndFilter = dfWithSelectsExpr
> 		  .join(lookupTable, expr("productId = pid"))
> 		  .filter("productId < 50")
> 		val query = withJoinAndFilter.writeStream
> 		  .outputMode("append")
> 		  .format("console")
> 		  .trigger(ProcessingTime(2000))
> 		  .start()
> 		query.processAllAvailable()
> 		spark.stop()
> 	}
> }
> {code}
> the query progress output will show: 
> {code:java}
> "durationMs" : {
>     "addBatch" : 1310,
>     "getBatch" : 6,
>     "getOffset" : 0,
>     "*queryPlanning*" : 36924,
>     "triggerExecution" : 38297,
>     "walCommit" : 33
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message