spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "DB Tsai (JIRA)" <j...@apache.org>
Subject [jira] [Created] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures
Date Mon, 09 Oct 2017 23:01:00 GMT
DB Tsai created SPARK-22231:
-------------------------------

             Summary: Support of map, filter, withColumn, dropColumn in nested list of structures
                 Key: SPARK-22231
                 URL: https://issues.apache.org/jira/browse/SPARK-22231
             Project: Spark
          Issue Type: New Feature
          Components: SQL
    Affects Versions: 2.2.0
            Reporter: DB Tsai


At Netflix's algorithm team, we work on ranking problems to find the great content to fulfill
the unique tastes of our members. Before building a recommendation algorithms, we need to
prepare the training, testing, and validation datasets in Apache Spark. Due to the nature
of ranking problems, we have a nested list of items to be ranked in one column, and the top
level is the contexts describing the setting for where a model is to be used (e.g. profiles,
country, time, device, etc.)  Here is a blog post describing the details, [Distributed Time
Travel for Feature Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907]
 
To be more concrete, for the ranks of videos for a given profile_id at a given country, our
data schema can be looked like this,
{code:java}
root
 |-- profile_id: long (nullable = true)
 |-- country_iso_code: string (nullable = true)
 |-- items: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- title_id: integer (nullable = true)
 |    |    |-- scores: double (nullable = true)
...
{code}

We oftentimes need to work on the nested level by applying some functions in it, even dropping,
or adding new columns in the nested level. Currently, there is no easy solution in open source
Apache Spark to perform those operations using SQL primitives; many people we talked to just
converting the data into RDD to work on the nested level of data, and then reconstruct the
new dataframe. This is extremely inefficient because all the optimizations like predicate
pushdown in SQL can not be performed, we can not leverage on the columnar format, and the
serialization and deserialization cost becomes really huge even we just want to add a new
column in the nested level.

We built a solution internally at Netflix which we're very happy with. We plan to make it
open source in Spark upstream, and we would like to socialize the API design to see if we
miss any use-case.  

The first API we added is *mapItems* on dataframe which take a function from *Column* to *Column*,
and then apply the function on nested dataframe. Here is an example,
{code:java}
case class Data(foo: Int, bar: Double, items: Seq[Double])

val df: Dataset[Data] = spark.createDataset(Seq(
  Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
  Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
))

val result = df.mapItems("items") {
  item => item * 2.0
}

result.printSchema()
// root
// |-- foo: integer (nullable = false)
// |-- bar: double (nullable = false)
// |-- items: array (nullable = true)
// |    |-- element: double (containsNull = true)

result.show()
// +---+----+--------------------+
// |foo| bar|               items|
// +---+----+--------------------+
// | 10|10.0|[20.2, 20.4, 20.6...|
// | 20|20.0|[40.2, 40.4, 40.6...|
// +---+----+--------------------+
{code}

Now, with the ability of applying a function in the nested dataframe, we can add a new function,
*withColumn* in *Column* to add or replace the existing column that has the same name in the
nested list of struct. Here is two examples demonstrating the API together with *mapItems*;
the first one replaces the existing column,
{code:java}
case class Item(a: Int, b: Double)

case class Data(foo: Int, bar: Double, items: Seq[Item])

val df: Dataset[Data] = spark.createDataset(Seq(
  Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
  Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
))

val result = df.mapItems("items") {
  item => item.withColumn(item("b") + 1 as "b")
}

result.printSchema
root
// |-- foo: integer (nullable = false)
// |-- bar: double (nullable = false)
// |-- items: array (nullable = true)
// |    |-- element: struct (containsNull = true)
// |    |    |-- a: integer (nullable = true)
// |    |    |-- b: double (nullable = true)

result.show(false)
// +---+----+----------------------+
// |foo|bar |items                 |
// +---+----+----------------------+
// |10 |10.0|[[10,11.0], [11,12.0]]|
// |20 |20.0|[[20,21.0], [21,22.0]]|
// +---+----+----------------------+
{code}
and the second one adds a new column in the nested dataframe.
{code:java}
val df: Dataset[Data] = spark.createDataset(Seq(
  Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
  Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
))

val result = df.mapItems("items") {
  item => item.withColumn(item("b") + 1 as "b")
}

result.printSchema
root
// |-- foo: integer (nullable = false)
// |-- bar: double (nullable = false)
// |-- items: array (nullable = true)
// |    |-- element: struct (containsNull = true)
// |    |    |-- a: integer (nullable = true)
// |    |    |-- b: double (nullable = true)
// |    |    |-- c: double (nullable = true)

result.show(false)
// +---+----+--------------------------------+
// |foo|bar |items                           |
// +---+----+--------------------------------+
// |10 |10.0|[[10,10.0,11.0], [11,11.0,12.0]]|
// |20 |20.0|[[20,20.0,21.0], [21,21.0,22.0]]|
// +---+----+--------------------------------+
{code}

We also implement a filter predicate to nested list of struct, and it will return those items
which matched the predicate. The following is the API example,
{code:java}
val df: Dataset[Data] = spark.createDataset(Seq(
  Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
  Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
))

val result = df.filterItems("items") {
  item => item("a") < 20
}

// +---+----+----------------------+
// |foo|bar |items                 |
// +---+----+----------------------+
// |10 |10.0|[[10,10.0], [11,11.0]]|
// |20 |20.0|[]                    |
// +---+----+----------------------+
{code}

Dropping a column in the nested list of struct can be achieved by similar API to *withColumn*.
We add *drop* method to *Column* to implement this. Here is an example,
{code:java}
val df: Dataset[Data] = spark.createDataset(Seq(
  Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
  Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
))

val result = df.mapItems("items") {
  item => item.drop("b")
}

result.printSchema
root
// |-- foo: integer (nullable = false)
// |-- bar: double (nullable = false)
// |-- items: array (nullable = true)
// |    |-- element: struct (containsNull = true)
// |    |    |-- a: integer (nullable = true)

result.show(false)
// +---+----+------------+
// |foo|bar |items       |
// +---+----+------------+
// |10 |10.0|[[10], [11]]|
// |20 |20.0|[[20], [21]]|
// +---+----+------------+
{code}

As you can see, those APIs are not opaque to Spark optimizers, and can fully take advantage
of columnar data structure. 

We're looking forward to the community feedback and suggestion! Thanks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message