spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matt Cheah <mch...@palantir.com>
Subject Re: DataFrames Aggregate does not spill?
Date Tue, 22 Sep 2015 02:21:03 GMT
I was executing on Spark 1.4 so I didn¹t notice the Tungsten option would
make spilling happen in 1.5. I¹ll upgrade to 1.5 and see how that turns out.
Thanks!

From:  Reynold Xin <rxin@databricks.com>
Date:  Monday, September 21, 2015 at 5:36 PM
To:  Matt Cheah <mcheah@palantir.com>
Cc:  "dev@spark.apache.org" <dev@spark.apache.org>, Mingyu Kim
<mkim@palantir.com>, Peter Faiman <peterfaiman@palantir.com>
Subject:  Re: DataFrames Aggregate does not spill?

What's the plan if you run explain?

In 1.5 the default should be TungstenAggregate, which does spill (switching
from hash-based aggregation to sort-based aggregation).

On Mon, Sep 21, 2015 at 5:34 PM, Matt Cheah <mcheah@palantir.com> wrote:
> Hi everyone,
> 
> I¹m debugging some slowness and apparent memory pressure + GC issues after I
> ported some workflows from raw RDDs to Data Frames. In particular, I¹m looking
> into an aggregation workflow that computes many aggregations per key at once.
> 
> My workflow before was doing a fairly straightforward combineByKey call where
> the aggregation would build up non-trivially sized objects in memory ­ I was
> computing numerous sums over various fields of the data at a time. In
> particular, I noticed that there was spilling to disk on the map side of the
> aggregation.
> 
> When I switched to using DataFrames aggregation ­ particularly
> DataFrame.groupBy(some list of keys).agg(exprs) where I passed a large number
> of ³Sum² exprs - the execution began to choke. I saw one of my executors had a
> long GC pause and my job isn¹t able to recover. However when I reduced the
> number of Sum expressions being computed in the aggregation, the workflow
> started to work normally.
> 
> I have a hypothesis that I would like to run by everyone. In
> org.apache.spark.sql.execution.Aggregate.scala, branch-1.5, I¹m looking at the
> execution of Aggregation
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_
> blob_branch-2D1.5_sql_core_src_main_scala_org_apache_spark_sql_execution_Aggre
> gate.scala&d=BQMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E9
> 9EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=MY0QvbkaVGKP6m7L6daL19eak5Q_ByWt_84mRZfff8
> k&s=2f8iTPkA6bsre40-juWK2Q5xA-v_5y6f3ucP4cKKa1s&e=> , which appears to build
> the aggregation result in memory via updating a HashMap and iterating over the
> rows. However this appears to be less robust than what would happen if
> PairRDDFunctions.combineByKey were to be used. If combineByKey were used, then
> instead of using two mapPartitions calls (assuming the aggregation is
> partially-computable, as Sum is), it would use the ExternalSorter and
> ExternalAppendOnlyMap objects to compute the aggregation. This would allow the
> aggregated result to grow large as some of the aggregated result could be
> spilled to disk. This especially seems bad if the aggregation reduction factor
> is low; that is, if there are many unique keys in the dataset. In particular,
> the Hash Map is holding O(# of keys * number of aggregated results per key)
> items in memory at a time.
> 
> I was wondering what everyone¹s thought on this problem is. Did we consciously
> think about the robustness implications when choosing to use an in memory Hash
> Map to compute the aggregation? Is this an inherent limitation of the
> aggregation implementation in Data Frames?
> 
> Thanks,
> 
> -Matt Cheah
> 
> 
> 
> 
> 




Mime
View raw message