spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hyukjin Kwon (JIRA)" <>
Subject [jira] [Commented] (SPARK-26413) SPIP: RDD Arrow Support in Spark Core and PySpark
Date Mon, 24 Dec 2018 04:25:00 GMT


Hyukjin Kwon commented on SPARK-26413:

For clarification, Arrow column vector APIs are exposed under [].
So it is pretty much feasible for a thirdparty to implement this. For instance, the company
I belong used this approach to use Arrow format (see also
It's feasible to integrate with {{ColumnarBatch}}. 

Since RDD APIs are being very conservative, I would like to be very sure if we have a strong
reason to add RDD APIs. For instance, is it impossible to use the API I pointed out? Since
Arrow is self-described and structural, it mostly only makes sense to use it with SparkSQL
within Apahce Spark. In this way, wouldn't it more make sense to make the current vector APIs
easier to use?

> SPIP: RDD Arrow Support in Spark Core and PySpark
> -------------------------------------------------
>                 Key: SPARK-26413
>                 URL:
>             Project: Spark
>          Issue Type: Improvement
>          Components: PySpark, Spark Core
>    Affects Versions: 3.0.0
>            Reporter: Richard Whitcomb
>            Priority: Minor
> h2. Background and Motivation
> Arrow is becoming an standard interchange format for columnar Structured Data.  This
is already true in Spark with the use of arrow in the pandas udf functions in the dataframe
> However the current implementation of arrow in spark is limited to two use cases.
>  * Pandas UDF that allows for operations on one or more columns in the DataFrame API.
>  * Collect as Pandas which pulls back the entire dataset to the driver in a Pandas Dataframe.
> What is still hard however is making use of all of the columns in a Dataframe while staying
distributed across the workers.  The only way to do this currently is to drop down into RDDs
and collect the rows into a dataframe. However pickling is very slow and the collecting is
> The proposal is to extend spark in a way that allows users to operate on an Arrow Table
fully while still making use of Spark's underlying technology.  Some examples of possibilities
with this new API. 
>  * Pass the Arrow Table with Zero Copy to PyTorch for predictions.
>  * Pass to Nvidia Rapids for an algorithm to be run on the GPU.
>  * Distribute data across many GPUs making use of the new Barriers API.
> h2. Targets users and personas
> ML, Data Scientists, and future library authors..
> h2. Goals
>  * Conversion from any Dataset[Row] or PySpark Dataframe to RDD[Table]
>  * Conversion back from any RDD[Table] to Dataset[Row], RDD[Row], Pyspark Dataframe
>  * Open the possibilities to tighter integration between Arrow/Pandas/Spark especially
at a library level.
> h2. Non-Goals
>  * Not creating a new API but instead using existing APIs.
> h2. Proposed API changes
> h3. Data Objects
> case class ArrowTable(schema: Schema, batches: Iterable[ArrowRecordBatch])
> h3. Dataset.scala
> {code:java}
> // Converts a Dataset to an RDD of Arrow Tables
> // Each RDD row is an Interable of Arrow Batches.
> def arrowRDD: RDD[ArrowTable]
> // Utility Function to convert to RDD Arrow Table for PySpark
> private[sql] def javaToPythonArrow: JavaRDD[Array[Byte]]
> {code}
> h3. RDD.scala
> {code:java}
>  // Converts RDD[ArrowTable] to an Dataframe by inspecting the Arrow Schema
>  def arrowToDataframe(implicit ev: T <:< ArrowTable): Dataframe
>  // Converts RDD[ArrowTable] to an RDD of Rows
>  def arrowToRDD(implicit ev: T <:< ArrowTable): RDD[Row]{code}
> h3.
> {code:java}
> # Serializer that takes a Serialized Arrow Tables and returns a pyarrow Table.
> class ArrowSerializer(FramedSerializer)
> {code}
> h3.
> {code}
> # New RDD Class that has an RDD[ArrowTable] behind it and uses the new ArrowSerializer
instead of the normal Pickle Serializer
> class ArrowRDD(RDD){code}
> h3.
> {code}
> // New Function that converts a pyspark dataframe into an ArrowRDD
> def arrow(self):
> {code}
> h2. Example API Usage
> h3. Pyspark
> {code}
> # Select a Single Column Using Pandas
> def map_table(arrow_table):
>   import pyarrow as pa
>   pdf = arrow_table.to_pandas()
>   pdf = pdf[['email']]
>   return pa.Table.from_pandas(pdf)
> # Convert to Arrow RDD, map over tables, convert back to dataframe
> {code}
> h3. Scala
> {code:java}
> // Find N Centroids using Cuda Rapids kMeans
> def runCuKmeans(table: ArrowTable, clusters: Int): ArrowTable
> // Convert Dataset[Row] to RDD[ArrowTable] and back to Dataset[Row]
> => runCuKmeans(table, N))
> {code}
> h2. Implementation Details
> As mentioned in the first section, the goal is to make it easier for Spark users to interact
with Arrow tools and libraries.  This however does come with some considerations from a Spark
>  Arrow is column based instead of Row based.  In the above API proposal of RDD[ArrowTable]
each RDD row will in fact be a block of data.  Another proposal in this regard is to introduce
a new parameter to Spark called arrow.sql.execution.arrow.maxRecordsPerTable.  The goal of
this parameter is to decide how many records are included in a single Arrow Table.  If set
to -1 the entire partition will be included in the table else to that number. Within that
number the normal blocking mechanisms of Arrow is used to include multiple batches.  This
is still dictated by arrowMaxRecordsPerBatch.

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message