spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Franz (Jira)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-30552) Chained spark column expressions with distinct windows specs produce inefficient DAG
Date Fri, 17 Jan 2020 14:20:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-30552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Franz updated SPARK-30552:
--------------------------
    Environment: 
python : 3.6.9.final.0
 python-bits : 64
 OS : Windows
 OS-release : 10
 machine : AMD64
 processor : Intel64 Family 6 Model 158 Stepping 10, GenuineIntel



pyspark: 2.4.4

pandas : 0.25.3
 numpy : 1.17.4

pyarrow : 0.15.1

  was:
INSTALLED VERSIONS
------------------
commit : None
python : 3.6.9.final.0
python-bits : 64
OS : Windows
OS-release : 10
machine : AMD64
processor : Intel64 Family 6 Model 158 Stepping 10, GenuineIntel
byteorder : little
LC_ALL : None
LANG : de_DE.UTF-8
LOCALE : None.None

pandas : 0.25.3
numpy : 1.17.4
pytz : 2019.3
dateutil : 2.8.1
pip : 19.3.1
setuptools : 41.6.0.post20191030
Cython : None
pytest : 5.3.0
hypothesis : None
sphinx : 2.2.1
blosc : None
feather : None
xlsxwriter : None
lxml.etree : None
html5lib : None
pymysql : None
psycopg2 : None
jinja2 : 2.10.3
IPython : 7.11.1
pandas_datareader: None
bs4 : None
bottleneck : None
fastparquet : None
gcsfs : None
lxml.etree : None
matplotlib : None
numexpr : None
odfpy : None
openpyxl : None
pandas_gbq : None
pyarrow : 0.15.1
pytables : None
s3fs : None
scipy : None
sqlalchemy : None
tables : None
xarray : None
xlrd : None
xlwt : None
xlsxwriter : None


> Chained spark column expressions with distinct windows specs produce inefficient DAG
> ------------------------------------------------------------------------------------
>
>                 Key: SPARK-30552
>                 URL: https://issues.apache.org/jira/browse/SPARK-30552
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Spark Core
>    Affects Versions: 2.4.4
>         Environment: python : 3.6.9.final.0
>  python-bits : 64
>  OS : Windows
>  OS-release : 10
>  machine : AMD64
>  processor : Intel64 Family 6 Model 158 Stepping 10, GenuineIntel
> pyspark: 2.4.4
> pandas : 0.25.3
>  numpy : 1.17.4
> pyarrow : 0.15.1
>            Reporter: Franz
>            Priority: Major
>
> h2.  Context
> Let's say you deal with time series data. Your desired outcome relies on multiple window
functions with distinct window specifications. The result may resemble a single spark column
expression, like an identifier for intervals.
> h2. Status Quo
> Usually, I don't store intermediate results with `df.withColumn` but rather chain/stack
column expressions and trust Spark to find the most effective DAG (when dealing with DataFrame).
> h2. Reproducible example
> However, in the following example (PySpark 2.4.4 standalone), storing an intermediate
result with `df.withColumn` reduces the DAG complexity. Let's consider following test setup:
> {code:python}
> import pandas as pd
> import numpy as np
> from pyspark.sql import SparkSession, Window
> from pyspark.sql import functions as F
> spark = SparkSession.builder.getOrCreate()
> dfp = pd.DataFrame(
>     {
>         "col1": np.random.randint(0, 5, size=100),
>         "col2": np.random.randint(0, 5, size=100),
>         "col3": np.random.randint(0, 5, size=100),
>         "col4": np.random.randint(0, 5, size=100),        
>     }
> )
> df = spark.createDataFrame(dfp)
> df.show(5)
> +----+----+----+----+
> |col1|col2|col3|col4|
> +----+----+----+----+
> |   1|   2|   4|   1|
> |   0|   2|   3|   0|
> |   2|   0|   1|   0|
> |   4|   1|   1|   2|
> |   1|   3|   0|   4|
> +----+----+----+----+
> only showing top 5 rows
> {code}
> The computation is arbitrary. Basically we have 2 window specs and 3 computational steps.
The 3 computational steps are dependend on each other and use alternating window specs:
> {code:python}
> w1 = Window.partitionBy("col1").orderBy("col2")
> w2 = Window.partitionBy("col3").orderBy("col4")
> # first step, arbitrary window func over 1st window
> step1 = F.lag("col3").over(w1)
> # second step, arbitrary window func over 2nd window with step 1
> step2 = F.lag(step1).over(w2)
> # third step, arbitrary window func over 1st window with step 2
> step3 = F.when(step2 > 1, F.max(step2).over(w1))
> df_result = df.withColumn("result", step3)
> {code}
> Inspecting the phyiscal plan via `df_result.explain()` reveals 4 exchanges and sorts!
However, only 3 should be necessary here because we change the window spec only twice. 
> {code:python}
> df_result.explain()
> == Physical Plan ==
> *(7) Project [col1#0L, col2#1L, col3#2L, col4#3L, CASE WHEN (_we0#25L > 1) THEN _we1#26L
END AS result#22L]
> +- Window [lag(_w0#23L, 1, null) windowspecdefinition(col3#2L, col4#3L ASC NULLS FIRST,
specifiedwindowframe(RowFrame, -1, -1)) AS _we0#25L], [col3#2L], [col4#3L ASC NULLS FIRST]
>    +- *(6) Sort [col3#2L ASC NULLS FIRST, col4#3L ASC NULLS FIRST], false, 0
>       +- Exchange hashpartitioning(col3#2L, 200)
>          +- *(5) Project [col1#0L, col2#1L, col3#2L, col4#3L, _w0#23L, _we1#26L]
>             +- Window [max(_w1#24L) windowspecdefinition(col1#0L, col2#1L ASC NULLS FIRST,
specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS _we1#26L], [col1#0L],
[col2#1L ASC NULLS FIRST]
>                +- *(4) Sort [col1#0L ASC NULLS FIRST, col2#1L ASC NULLS FIRST], false,
0
>                   +- Exchange hashpartitioning(col1#0L, 200)
>                      +- *(3) Project [col1#0L, col2#1L, col3#2L, col4#3L, _w0#23L, _w1#24L]
>                         +- Window [lag(_w0#27L, 1, null) windowspecdefinition(col3#2L,
col4#3L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _w1#24L], [col3#2L], [col4#3L
ASC NULLS FIRST]
>                            +- *(2) Sort [col3#2L ASC NULLS FIRST, col4#3L ASC NULLS FIRST],
false, 0
>                               +- Exchange hashpartitioning(col3#2L, 200)
>                                  +- Window [lag(col3#2L, 1, null) windowspecdefinition(col1#0L,
col2#1L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _w0#27L, lag(col3#2L,
1, null) windowspecdefinition(col1#0L, col2#1L ASC NULLS FIRST, specifiedwindowframe(RowFrame,
-1, -1)) AS _w0#23L], [col1#0L], [col2#1L ASC NULLS FIRST]
>                                     +- *(1) Sort [col1#0L ASC NULLS FIRST, col2#1L ASC
NULLS FIRST], false, 0
>                                        +- Exchange hashpartitioning(col1#0L, 200)
>                                           +- Scan ExistingRDD[col1#0L,col2#1L,col3#2L,col4#3L]
> {code}
> h2. Improvement
> To get a better DAG, we slightly modify the code to store the column expression of `step2`
with `withColumn` and just pass the reference of this column. The new logical plan requires
only 3 shuffles indeed! 
> {code:python}
> w1 = Window.partitionBy("col1").orderBy("col2")
> w2 = Window.partitionBy("col3").orderBy("col4")
> # first step, arbitrary window func
> step1 = F.lag("col3").over(w1)
> # second step, arbitrary window func over 2nd window with step 1
> step2 = F.lag(step1).over(w2)
> # save temporary
> df = df.withColumn("tmp_variable", step2)
> step2 = F.col("tmp_variable")
> # third step, arbitrary window func over 1st window with step 2
> step3 = F.when(step2 > 1, F.max(step2).over(w1))
> df_result = df.withColumn("result", step3).drop("tmp_variable")
> df_result.explain()
> == Physical Plan ==
> *(5) Project [col1#0L, col2#1L, col3#2L, col4#3L, CASE WHEN (tmp_variable#33L > 1)
THEN _we0#42L END AS result#41L]
> +- Window [max(tmp_variable#33L) windowspecdefinition(col1#0L, col2#1L ASC NULLS FIRST,
specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS _we0#42L], [col1#0L],
[col2#1L ASC NULLS FIRST]
>    +- *(4) Sort [col1#0L ASC NULLS FIRST, col2#1L ASC NULLS FIRST], false, 0
>       +- Exchange hashpartitioning(col1#0L, 200)
>          +- *(3) Project [col1#0L, col2#1L, col3#2L, col4#3L, tmp_variable#33L]
>             +- Window [lag(_w0#34L, 1, null) windowspecdefinition(col3#2L, col4#3L ASC
NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS tmp_variable#33L], [col3#2L], [col4#3L
ASC NULLS FIRST]
>                +- *(2) Sort [col3#2L ASC NULLS FIRST, col4#3L ASC NULLS FIRST], false,
0
>                   +- Exchange hashpartitioning(col3#2L, 200)
>                      +- Window [lag(col3#2L, 1, null) windowspecdefinition(col1#0L, col2#1L
ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _w0#34L], [col1#0L], [col2#1L
ASC NULLS FIRST]
>                         +- *(1) Sort [col1#0L ASC NULLS FIRST, col2#1L ASC NULLS FIRST],
false, 0
>                            +- Exchange hashpartitioning(col1#0L, 200)
>                               +- Scan ExistingRDD[col1#0L,col2#1L,col3#2L,col4#3L]
> {code}
> h2. Relevance
> My original example was even more complex and resulted in an even greater difference
of the DAG (on real world data up to 10 times slower)
> h2. Question
> Does anyone have an answer to this odd behavior? I've thought that stacking/chaining
column expressions is best practice since it allows Spark to optimize intermediate steps most
effectively (in contrast to create references for intermediate results). 
> Thanks in advance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message