spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
Subject spark git commit: [SPARK-22221][DOCS] Adding User Documentation for Arrow
Date Mon, 29 Jan 2018 18:25:48 GMT
Repository: spark
Updated Branches:
  refs/heads/master 2d903cf9d -> 0d60b3213

[SPARK-22221][DOCS] Adding User Documentation for Arrow

## What changes were proposed in this pull request?

Adding user facing documentation for working with Arrow in Spark

Author: Bryan Cutler <>
Author: Li Jin <>
Author: hyukjinkwon <>

Closes #19575 from BryanCutler/arrow-user-docs-SPARK-2221.


Branch: refs/heads/master
Commit: 0d60b3213fe9a7ae5e9b208639f92011fdb2ca32
Parents: 2d903cf
Author: Bryan Cutler <>
Authored: Mon Jan 29 10:25:25 2018 -0800
Committer: gatorsmile <>
Committed: Mon Jan 29 10:25:25 2018 -0800

 docs/         | 134 ++++++++++++++++++++++++++++-
 examples/src/main/python/sql/ | 129 +++++++++++++++++++++++++++
 2 files changed, 262 insertions(+), 1 deletion(-)
diff --git a/docs/ b/docs/
index 502c0a8..d49c8d8 100644
--- a/docs/
+++ b/docs/
@@ -1640,6 +1640,138 @@ Configuration of Hive is done by placing your `hive-site.xml`, `core-site.xml`
 You may run `./bin/spark-sql --help` for a complete list of all available
+# PySpark Usage Guide for Pandas with Apache Arrow
+## Apache Arrow in Spark
+Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer
+data between JVM and Python processes. This currently is most beneficial to Python users
+work with Pandas/NumPy data. Its usage is not automatic and might require some minor
+changes to configuration or code to take full advantage and ensure compatibility. This guide
+give a high-level description of how to use Arrow in Spark and highlight any differences
+working with Arrow-enabled data.
+### Ensure PyArrow Installed
+If you install PySpark using pip, then PyArrow can be brought in as an extra dependency of
+SQL module with the command `pip install pyspark[sql]`. Otherwise, you must ensure that PyArrow
+is installed and available on all cluster nodes. The current supported version is 0.8.0.
+You can install using pip or conda from the conda-forge channel. See PyArrow
+[installation]( for details.
+## Enabling for Conversion to/from Pandas
+Arrow is available as an optimization when converting a Spark DataFrame to a Pandas DataFrame
+using the call `toPandas()` and when creating a Spark DataFrame from a Pandas DataFrame with
+`createDataFrame(pandas_df)`. To use Arrow when executing these calls, users need to first
+the Spark configuration 'spark.sql.execution.arrow.enabled' to 'true'. This is disabled by
+<div class="codetabs">
+<div data-lang="python" markdown="1">
+{% include_example dataframe_with_arrow python/sql/ %}
+Using the above optimizations with Arrow will produce the same results as when Arrow is not
+enabled. Note that even with Arrow, `toPandas()` results in the collection of all records
in the
+DataFrame to the driver program and should be done on a small subset of the data. Not all
+data types are currently supported and an error can be raised if a column has an unsupported
+see [Supported Types](#supported-sql-arrow-types). If an error occurs during `createDataFrame()`,
+Spark will fall back to create the DataFrame without Arrow.
+## Pandas UDFs (a.k.a. Vectorized UDFs)
+Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer
data and
+Pandas to work with the data. A Pandas UDF is defined using the keyword `pandas_udf` as a
+or to wrap the function, no additional configuration is required. Currently, there are two
types of
+Pandas UDF: Scalar and Group Map.
+### Scalar
+Scalar Pandas UDFs are used for vectorizing scalar operations. They can be used with functions
+as `select` and `withColumn`. The Python function should take `pandas.Series` as inputs and
+a `pandas.Series` of the same length. Internally, Spark will execute a Pandas UDF by splitting
+columns into batches and calling the function for each batch as a subset of the data, then
+concatenating the results together.
+The following example shows how to create a scalar Pandas UDF that computes the product of
2 columns.
+<div class="codetabs">
+<div data-lang="python" markdown="1">
+{% include_example scalar_pandas_udf python/sql/ %}
+### Group Map
+Group map Pandas UDFs are used with `groupBy().apply()` which implements the "split-apply-combine"
+Split-apply-combine consists of three steps:
+* Split the data into groups by using `DataFrame.groupBy`.
+* Apply a function on each group. The input and output of the function are both `pandas.DataFrame`.
+  input data contains all the rows and columns for each group.
+* Combine the results into a new `DataFrame`.
+To use `groupBy().apply()`, the user needs to define the following:
+* A Python function that defines the computation for each group.
+* A `StructType` object or a string that defines the schema of the output `DataFrame`.
+Note that all data for a group will be loaded into memory before the function is applied.
This can
+lead to out of memory exceptons, especially if the group sizes are skewed. The configuration
+[maxRecordsPerBatch](#setting-arrow-batch-size) is not applied on groups and it is up to
the user
+to ensure that the grouped data will fit into the available memory.
+The following example shows how to use `groupby().apply()` to subtract the mean from each
value in the group.
+<div class="codetabs">
+<div data-lang="python" markdown="1">
+{% include_example group_map_pandas_udf python/sql/ %}
+For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
+## Usage Notes
+### Supported SQL Types
+Currently, all Spark SQL data types are supported by Arrow-based conversion except `MapType`,
+`ArrayType` of `TimestampType`, and nested `StructType`.
+### Setting Arrow Batch Size
+Data partitions in Spark are converted into Arrow record batches, which can temporarily lead
+high memory usage in the JVM. To avoid possible out of memory exceptions, the size of the
+record batches can be adjusted by setting the conf "spark.sql.execution.arrow.maxRecordsPerBatch"
+to an integer that will determine the maximum number of rows for each batch. The default
value is
+10,000 records per batch. If the number of columns is large, the value should be adjusted
+accordingly. Using this limit, each data partition will be made into 1 or more record batches
+### Timestamp with Time Zone Semantics
+Spark internally stores timestamps as UTC values, and timestamp data that is brought in without
+a specified time zone is converted as local time to UTC with microsecond resolution. When
+data is exported or displayed in Spark, the session time zone is used to localize the timestamp
+values. The session time zone is set with the configuration 'spark.sql.session.timeZone'
and will
+default to the JVM system local time zone if not set. Pandas uses a `datetime64` type with
+resolution, `datetime64[ns]`, with optional time zone on a per-column basis.
+When timestamp data is transferred from Spark to Pandas it will be converted to nanoseconds
+and each column will be converted to the Spark session time zone then localized to that time
+zone, which removes the time zone and displays values as local time. This will occur
+when calling `toPandas()` or `pandas_udf` with timestamp columns.
+When timestamp data is transferred from Pandas to Spark, it will be converted to UTC microseconds.
+occurs when calling `createDataFrame` with a Pandas DataFrame or when returning a timestamp
from a
+`pandas_udf`. These conversions are done automatically to ensure Spark will have data in
+expected format, so it is not necessary to do any of these conversions yourself. Any nanosecond
+values will be truncated.
+Note that a standard UDF (non-Pandas) will load timestamp data as Python datetime objects,
which is
+different than a Pandas timestamp. It is recommended to use Pandas time series functionality
+working with timestamps in `pandas_udf`s to get the best performance, see
+[here]( for details.
 # Migration Guide
 ## Upgrading From Spark SQL 2.2 to 2.3
@@ -1788,7 +1920,7 @@ options.
     Note that, for <b>DecimalType(38,0)*</b>, the table above intentionally does
not cover all other combinations of scales and precisions because currently we only infer
decimal type like `BigInteger`/`BigInt`. For example, 1.1 is inferred as double type.
   - In PySpark, now we need Pandas 0.19.2 or upper if you want to use Pandas related functionalities,
such as `toPandas`, `createDataFrame` from Pandas DataFrame, etc.
   - In PySpark, the behavior of timestamp values for Pandas related functionalities was changed
to respect session timezone. If you want to use the old behavior, you need to set a configuration
`spark.sql.execution.pandas.respectSessionTimeZone` to `False`. See [SPARK-22395](
for details.
-  - In PySpark, `na.fill()` or `fillna` also accepts boolean and replaces nulls with booleans.
In prior Spark versions, PySpark just ignores it and returns the original Dataset/DataFrame.
+  - In PySpark, `na.fill()` or `fillna` also accepts boolean and replaces nulls with booleans.
In prior Spark versions, PySpark just ignores it and returns the original Dataset/DataFrame.
   - Since Spark 2.3, when either broadcast hash join or broadcast nested loop join is applicable,
we prefer to broadcasting the table that is explicitly specified in a broadcast hint. For
details, see the section [Broadcast Hint](#broadcast-hint-for-sql-queries) and [SPARK-22489](
   - Since Spark 2.3, when all inputs are binary, `functions.concat()` returns an output as
binary. Otherwise, it returns as a string. Until Spark 2.3, it always returns as a string
despite of input types. To keep the old behavior, set `spark.sql.function.concatBinaryAsString`
to `true`.
   - Since Spark 2.3, when all inputs are binary, SQL `elt()` returns an output as binary.
Otherwise, it returns as a string. Until Spark 2.3, it always returns as a string despite
of input types. To keep the old behavior, set `spark.sql.function.eltOutputAsString` to `true`.
diff --git a/examples/src/main/python/sql/ b/examples/src/main/python/sql/
new file mode 100644
index 0000000..6c0028b
--- /dev/null
+++ b/examples/src/main/python/sql/
@@ -0,0 +1,129 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+A simple example demonstrating Arrow in Spark.
+Run with:
+  ./bin/spark-submit examples/src/main/python/sql/
+from __future__ import print_function
+from pyspark.sql import SparkSession
+from pyspark.sql.utils import require_minimum_pandas_version, require_minimum_pyarrow_version
+def dataframe_with_arrow_example(spark):
+    # $example on:dataframe_with_arrow$
+    import numpy as np
+    import pandas as pd
+    # Enable Arrow-based columnar data transfers
+    spark.conf.set("spark.sql.execution.arrow.enabled", "true")
+    # Generate a Pandas DataFrame
+    pdf = pd.DataFrame(np.random.rand(100, 3))
+    # Create a Spark DataFrame from a Pandas DataFrame using Arrow
+    df = spark.createDataFrame(pdf)
+    # Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
+    result_pdf ="*").toPandas()
+    # $example off:dataframe_with_arrow$
+    print("Pandas DataFrame result statistics:\n%s\n" % str(result_pdf.describe()))
+def scalar_pandas_udf_example(spark):
+    # $example on:scalar_pandas_udf$
+    import pandas as pd
+    from pyspark.sql.functions import col, pandas_udf
+    from pyspark.sql.types import LongType
+    # Declare the function and create the UDF
+    def multiply_func(a, b):
+        return a * b
+    multiply = pandas_udf(multiply_func, returnType=LongType())
+    # The function for a pandas_udf should be able to execute with local Pandas data
+    x = pd.Series([1, 2, 3])
+    print(multiply_func(x, x))
+    # 0    1
+    # 1    4
+    # 2    9
+    # dtype: int64
+    # Create a Spark DataFrame, 'spark' is an existing SparkSession
+    df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
+    # Execute function as a Spark vectorized UDF
+"x"), col("x"))).show()
+    # +-------------------+
+    # |multiply_func(x, x)|
+    # +-------------------+
+    # |                  1|
+    # |                  4|
+    # |                  9|
+    # +-------------------+
+    # $example off:scalar_pandas_udf$
+def group_map_pandas_udf_example(spark):
+    # $example on:group_map_pandas_udf$
+    from pyspark.sql.functions import pandas_udf, PandasUDFType
+    df = spark.createDataFrame(
+        [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+        ("id", "v"))
+    @pandas_udf("id long, v double", PandasUDFType.GROUP_MAP)
+    def substract_mean(pdf):
+        # pdf is a pandas.DataFrame
+        v = pdf.v
+        return pdf.assign(v=v - v.mean())
+    df.groupby("id").apply(substract_mean).show()
+    # +---+----+
+    # | id|   v|
+    # +---+----+
+    # |  1|-0.5|
+    # |  1| 0.5|
+    # |  2|-3.0|
+    # |  2|-1.0|
+    # |  2| 4.0|
+    # +---+----+
+    # $example off:group_map_pandas_udf$
+if __name__ == "__main__":
+    spark = SparkSession \
+        .builder \
+        .appName("Python Arrow-in-Spark example") \
+        .getOrCreate()
+    print("Running Pandas to/from conversion example")
+    dataframe_with_arrow_example(spark)
+    print("Running pandas_udf scalar example")
+    scalar_pandas_udf_example(spark)
+    print("Running pandas_udf group map example")
+    group_map_pandas_udf_example(spark)
+    spark.stop()

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message