spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [spark] LucaCanali opened a new pull request #26783: [SPARK-30153][PYTHON][WIP] Extend data exchange options for vectorized UDF functions with vanilla Arrow serialization
Date Fri, 06 Dec 2019 16:37:00 GMT
LucaCanali opened a new pull request #26783: [SPARK-30153][PYTHON][WIP] Extend data exchange
options for vectorized UDF functions with vanilla Arrow serialization
URL: https://github.com/apache/spark/pull/26783
 
 
   ### What changes were proposed in this pull request?
   Extend the data exchange options for vectorized UDF functions, by introducing options to
have plain Arrow serialization without conversions to Pandas, for performance reasons that
apply to certain cases, such as scientific computing of arrays. This WIP proposes to introduce
SCALAR_ARROW pandas_udf type as a first and exploratory step in this direction. As mentioned
below the performance results are quite promising.
   
   ### Why are the changes needed?
   Spark has introduced vectorized UDF with pandas_udf and this provides considerable speed
up, by reducing the overhead due to serialization and deserialization, where applciable.
   The current implementation of pandas_udf uses Arrow for fast serialization and then Pandas
Series (or Pandas DF) for processing.
   There are opportunities to improve UDF performance, in certain cases, by bypassing the
conversion to and from Pandas and using Arrow Tables, directly with the help of specialized
libraries able to process Arrow Tables and Arrays.
   One such case is for scientific computing of high energy physics data, where processing
of arrays of data is of key importance.
   A test case using such approach has shown an increase of performance of about 3x, compared
to the equivalent processing with pandas_udf, for a UDF based on plain Arrow serialization
using a custom-developed extension of pandas_udf.  Processing of Arrow data in the test case
was done via the "awkward arrays" library (https://github.com/scikit-hep/awkward-array).
   
   ### Does this PR introduce any user-facing change?
   Extension of pandas_udf types with SCALAR_ARROW (and possibly other "plain ARROW" extensions
of the existing pandas_udf types, but be yet implemented).
   
   ### How was this patch tested?
   Manually tested at present.
   This "SCALAR_ARROW" pandas_udf test runs in 21 seconds vs. 62 seconds of the corresponding
version with standard "pandas_udf SCALAR" code.
   This tests requires the awkward array library https://pypi.org/project/awkward/
   ```
   bin/pyspark --master local[2]
   
   df = sql("select cast(1.0 as double) col1, rand(42) col2, Array(rand(42),rand(42),rand(42))
col3 from range(1e8)")
   df.printSchema()
   
   root
    |-- col1: double (nullable = false)
    |-- col2: double (nullable = false)
    |-- col3: array (nullable = false)
    |    |-- element: double (containsNull = false)
   
   
   from pyspark.sql.functions import pandas_udf, PandasUDFType
   from pyspark.sql.types import *
   import awkward
   
   @pandas_udf(ArrayType(DoubleType()), PandasUDFType.SCALAR_ARROW)
   def test_arrow(col1):
       b = awkward.fromarrow(col1.chunk(0))
       c = awkward.toarrow(b * b)
       return c 
   
   
   import time
   start = time.time()
   df.withColumn('test', test_arrow(df.col3)).write.format("noop").mode("overwrite").save()
   end = time.time()
   print(end - start)
   ```
   
   This is the reference code with pandas_udf SCALAR (this runs in 62 seconds vs. 21 seconds
of the code above with SCALAR_ARROW implementation):
   ```
   @pandas_udf(ArrayType(DoubleType()), PandasUDFType.SCALAR)
   def test_pandas(col1):
     return col1*col1
   
   import time
   start = time.time()
   df.withColumn('test', test_pandas(df.col3)).write.format("noop").mode("overwrite").save()
   end = time.time()
   print(end - start)
   ```
   
   Co-authors of this work:
   Luca Canali @LucaCanali 
   Lindsey Gray @lgray
   Jim Pivarski @jpivarski
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message