spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Styles <michael.sty...@shopify.com>
Subject New Optimizer Hint
Date Thu, 20 Apr 2017 13:31:05 GMT
Hello,

I am in the process of putting together a PR that introduces a new hint
called NO_COLLAPSE. This hint is essentially identical to Oracle's NO_MERGE
hint.

Let me first give an example of why I am proposing this.

df1 = sc.sql.createDataFrame([(1, "abc")], ["id", "user_agent"])
df2 = df1.withColumn("ua", user_agent_details(df1["user_agent"]))
df3 = df2.select(df2["ua"].device_form_factor.alias("c1"),
df2["ua"].browser_version.alias("c2"))
df3.explain(True)

== Parsed Logical Plan ==
'Project [ua#85[device_form_factor] AS c1#90, ua#85[browser_version] AS
c2#91]
+- Project [id#80L, user_agent#81, UDF(user_agent#81) AS ua#85]
   +- LogicalRDD [id#80L, user_agent#81]

== Analyzed Logical Plan ==
c1: string, c2: string
Project [ua#85.device_form_factor AS c1#90, ua#85.browser_version AS c2#91]
+- Project [id#80L, user_agent#81, UDF(user_agent#81) AS ua#85]
   +- LogicalRDD [id#80L, user_agent#81]

== Optimized Logical Plan ==
Project [UDF(user_agent#81).device_form_factor AS c1#90,
UDF(user_agent#81).browser_version AS c2#91]
+- LogicalRDD [id#80L, user_agent#81]

== Physical Plan ==
*Project [UDF(user_agent#81).device_form_factor AS c1#90,
UDF(user_agent#81).browser_version AS c2#91]
+- Scan ExistingRDD[id#80L,user_agent#81]

user_agent_details is a user-defined function that returns a struct. As can
be seen from the generated query plan, the function is being executed
multiple times which could lead to performance issues. This is due to the
CollapseProject optimizer rule that collapses adjacent projections.

I'm proposing a hint that prevent the optimizer from collapsing adjacent
projections. A new function called 'no_collapse' would be introduced for
this purpose. Consider the following example and generated query plan.

df1 = sc.sql.createDataFrame([(1, "abc")], ["id", "user_agent"])
df2 = F.no_collapse(df1.withColumn("ua",
user_agent_details(df1["user_agent"])))
df3 = df2.select(df2["ua"].device_form_factor.alias("c1"),
df2["ua"].browser_version.alias("c2"))
df3.explain(True)

== Parsed Logical Plan ==
'Project [ua#69[device_form_factor] AS c1#75, ua#69[browser_version] AS
c2#76]
+- NoCollapseHint
   +- Project [id#64L, user_agent#65, UDF(user_agent#65) AS ua#69]
      +- LogicalRDD [id#64L, user_agent#65]

== Analyzed Logical Plan ==
c1: string, c2: string
Project [ua#69.device_form_factor AS c1#75, ua#69.browser_version AS c2#76]
+- NoCollapseHint
   +- Project [id#64L, user_agent#65, UDF(user_agent#65) AS ua#69]
      +- LogicalRDD [id#64L, user_agent#65]

== Optimized Logical Plan ==
Project [ua#69.device_form_factor AS c1#75, ua#69.browser_version AS c2#76]
+- NoCollapseHint
   +- Project [UDF(user_agent#65) AS ua#69]
      +- LogicalRDD [id#64L, user_agent#65]

== Physical Plan ==
*Project [ua#69.device_form_factor AS c1#75, ua#69.browser_version AS
c2#76]
+- *Project [UDF(user_agent#65) AS ua#69]
   +- Scan ExistingRDD[id#64L,user_agent#65]

As can be seen from the query plan, the user-defined function is now
evaluated once per row.

I would like to get some feedback on this proposal.

Thanks.

Mime
View raw message