spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sean Owen (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-16320) Spark 2.0 slower than 1.6 when querying nested columns
Date Mon, 15 Aug 2016 21:36:22 GMT

    [ https://issues.apache.org/jira/browse/SPARK-16320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15421719#comment-15421719
] 

Sean Owen commented on SPARK-16320:
-----------------------------------

I see, I wonder if this deserves a bit of documentation in the tuning section? you'd be welcome
to collect some of the wisdom here into a note in the docs.

> Spark 2.0 slower than 1.6 when querying nested columns
> ------------------------------------------------------
>
>                 Key: SPARK-16320
>                 URL: https://issues.apache.org/jira/browse/SPARK-16320
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.0
>            Reporter: Maciej BryƄski
>            Priority: Critical
>         Attachments: spark1.6-ui.png, spark2-ui.png
>
>
> I did some test on parquet file with many nested columns (about 30G in
> 400 partitions) and Spark 2.0 is sometimes slower.
> I tested following queries:
> 1) {code}select count(*) where id > some_id{code}
> In this query performance is similar. (about 1 sec)
> 2) {code}select count(*) where nested_column.id > some_id{code}
> Spark 1.6 -> 1.6 min
> Spark 2.0 -> 2.1 min
> Should I expect such a drop in performance ?
> I don't know how to prepare sample data to show the problem.
> Any ideas ? Or public data with many nested columns ?
> *UPDATE*
> I created script to generate data and to confirm this problem.
> {code}
> #Initialization
> from pyspark import SparkContext, SparkConf
> from pyspark.sql import HiveContext
> from pyspark.sql.functions import struct
> conf = SparkConf()
> conf.set('spark.cores.max', 15)
> conf.set('spark.executor.memory', '30g')
> conf.set('spark.driver.memory', '30g')
> sc = SparkContext(conf=conf)
> sqlctx = HiveContext(sc)
> #Data creation
> MAX_SIZE = 2**32 - 1
> path = '/mnt/mfs/parquet_nested'
> def create_sample_data(levels, rows, path):
>     
>     def _create_column_data(cols):
>         import random
>         random.seed()
>         return {"column{}".format(i): random.randint(0, MAX_SIZE) for i in range(cols)}
>     
>     def _create_sample_df(cols, rows):
>         rdd = sc.parallelize(range(rows))           
>         data = rdd.map(lambda r: _create_column_data(cols))
>         df = sqlctx.createDataFrame(data)
>         return df
>     
>     def _create_nested_data(levels, rows):
>         if len(levels) == 1:
>             return _create_sample_df(levels[0], rows).cache()
>         else:
>             df = _create_nested_data(levels[1:], rows)
>             return df.select([struct(df.columns).alias("column{}".format(i)) for i in
range(levels[0])])
>     df = _create_nested_data(levels, rows)
>     df.write.mode('overwrite').parquet(path)
>     
> #Sample data
> create_sample_data([2,10,200], 1000000, path)
> #Query
> df = sqlctx.read.parquet(path)
> %%timeit
> df.where("column1.column5.column50 > {}".format(int(MAX_SIZE / 2))).count()
> {code}
> Results
> Spark 1.6
> 1 loop, best of 3: *1min 5s* per loop
> Spark 2.0
> 1 loop, best of 3: *1min 21s* per loop
> *UPDATE 2*
> Analysis in https://issues.apache.org/jira/browse/SPARK-16321 direct to same source.
> I attached some VisualVM profiles there.
> Most interesting are from queries.
> https://issues.apache.org/jira/secure/attachment/12818785/spark16_query.nps
> https://issues.apache.org/jira/secure/attachment/12818784/spark2_query.nps



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message