spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Vinitha Reddy Gankidi (JIRA)" <j...@apache.org>
Subject [jira] [Created] (SPARK-28188) Materialize Dataframe API
Date Thu, 27 Jun 2019 18:20:00 GMT
Vinitha Reddy Gankidi created SPARK-28188:
---------------------------------------------

             Summary: Materialize Dataframe API 
                 Key: SPARK-28188
                 URL: https://issues.apache.org/jira/browse/SPARK-28188
             Project: Spark
          Issue Type: New Feature
          Components: Spark Core
    Affects Versions: 2.4.3
            Reporter: Vinitha Reddy Gankidi


We have added a new API to materialize dataframes and our internal users have found it very
useful. For use cases where you need to do different computations on the same dataframe, Spark
recomputes the dataframe each time. This is problematic if evaluation of the dataframe is
expensive.

Materialize is a Spark action. It is a way to let Spark explicitly know that the dataframe
has already been computed. Once a dataframe is materialized, Spark skips all stages prior
to the materialize when the dataframe is reused later on.

Spark may scan the same table twice if two queries load different columns. For example, the
following two queries would scan the same data twice:
{code:java}
val tab = spark.table("some_table").filter("c LIKE '%match%'")

val num_groups = tab.agg(distinctCount($"a"))

val groups_with_b = tab.groupBy($"a").agg(min($"b") as "min"){code}
 

The same table is scanned twice because Spark doesn't know it should load b when the first
query runs. You can use materialize to load and then reuse the data:

{code:java}
val materialized = spark.table("some_table").filter("c LIKE '%match%'")

                        .select($"a", $"b").repartition($"a").materialize()

val num_groups = materialized.agg(distinctCount($"a"))

val groups_with_b = materialized.groupBy($"a").agg(min($"b") as "min"){code}
 

This uses select to filter out columns that don't need to be loaded. Without this, Spark doesn't
know that only a and b are going to be used later.

This example also uses repartition to add a shuffle because Spark resumes from the last shuffle.
In most cases you may need to repartition the dataframe before materializing it in order to
skip the expensive stages as repartition introduces a new stage. 
h3. Materialize vs Cache:
 * Caching/Persisting of dataframes is lazy. The first time the dataset is computed in an
action, it will be kept in memory on the nodes. Materialize is an action that runs a job that
produces the rows of data that a data frame represents, and returns a new data frame with
the result. When the result data frame is used, Spark resumes execution using the data from
the last shuffle.
 * By reusing shuffle data, materialized data is served by the cluster's persistent shuffle
servers instead of Spark executors. This makes materialize more reliable. Caching on the other
hand happens in the executor where the task runs and data could be lost if executors time
out from inactivity or run out of memory.
 * Since materialize is more reliable and uses fewer resources than cache, it is usually a
better choice for batch workloads. But, for processing that iterates over a dataset many times,
it is better to keep the data in memory using cache or persist.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message